comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 273:91a31873c42a ganymed

start conversion from trilead to ganymed
author Carl Byington <carl@five-ten-sg.com>
date Fri, 18 Jul 2014 11:21:46 -0700
parents
children d2b303406d63
comparison
equal deleted inserted replaced
272:ce2f4e397703 273:91a31873c42a
1 package ch.ethz.ssh2;
2
3 import java.io.BufferedOutputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.net.SocketException;
8 import java.nio.charset.Charset;
9 import java.nio.charset.UnsupportedCharsetException;
10 import java.util.HashMap;
11 import java.util.Map;
12
13 import ch.ethz.ssh2.channel.Channel;
14 import ch.ethz.ssh2.log.Logger;
15 import ch.ethz.ssh2.packets.TypesReader;
16 import ch.ethz.ssh2.packets.TypesWriter;
17 import ch.ethz.ssh2.sftp.AttribFlags;
18 import ch.ethz.ssh2.sftp.ErrorCodes;
19 import ch.ethz.ssh2.sftp.Packet;
20 import ch.ethz.ssh2.util.StringEncoder;
21
22 /**
23 * @version $Id: AbstractSFTPClient.java 151 2014-04-28 10:03:39Z dkocher@sudo.ch $
24 */
25 public abstract class AbstractSFTPClient implements SFTPClient {
26
27 private static final Logger log = Logger.getLogger(SFTPv3Client.class);
28
29 private Session sess;
30
31 private InputStream is;
32 private OutputStream os;
33
34 private int next_request_id = 1000;
35
36 private String charset;
37
38 /**
39 * Parallel read requests maximum size.
40 */
41 private static final int DEFAULT_MAX_PARALLELISM = 64;
42
43 /**
44 * Parallel read requests.
45 */
46 private int parallelism = DEFAULT_MAX_PARALLELISM;
47
48 public void setRequestParallelism(int parallelism) {
49 this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM);
50 }
51
52 /**
53 * Mapping request ID to request.
54 */
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue
56 = new HashMap<Integer, OutstandingReadRequest>();
57
58 /**
59 * Mapping request ID to request.
60 */
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue
62 = new HashMap<Integer, OutstandingStatusRequest>();
63
64 private PacketListener listener;
65
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException {
67 this.listener = listener;
68
69 log.debug("Opening session and starting SFTP subsystem.");
70 sess = conn.openSession();
71 sess.startSubSystem("sftp");
72
73 is = sess.getStdout();
74 os = new BufferedOutputStream(sess.getStdin(), 2048);
75
76 init(version);
77
78 }
79
80 private void init(final int client_version) throws IOException {
81 // Send SSH_FXP_INIT with client version
82
83 TypesWriter tw = new TypesWriter();
84 tw.writeUINT32(client_version);
85 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes());
86
87 /* Receive SSH_FXP_VERSION */
88
89 log.debug("Waiting for SSH_FXP_VERSION...");
90 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */
91
92 int t = tr.readByte();
93 listener.read(Packet.forName(t));
94
95 if(t != Packet.SSH_FXP_VERSION) {
96 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t));
97 throw new PacketTypeException(t);
98 }
99
100 final int protocol_version = tr.readUINT32();
101
102 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
103 if(protocol_version != client_version) {
104 throw new IOException(String.format("Server protocol version %d does not match %d",
105 protocol_version, client_version));
106 }
107 // Both parties should from then on adhere to particular version of the protocol
108
109 // Read and save extensions (if any) for later use
110 while(tr.remain() != 0) {
111 String name = tr.readString();
112 listener.read(name);
113 byte[] value = tr.readByteString();
114 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value)));
115 }
116 }
117
118 /**
119 * Queries the channel state
120 *
121 * @return True if the underlying session is in open state
122 */
123 @Override
124 public boolean isConnected() {
125 return sess.getState() == Channel.STATE_OPEN;
126 }
127
128 /**
129 * Close this SFTP session. NEVER forget to call this method to free up
130 * resources - even if you got an exception from one of the other methods.
131 * Sometimes these other methods may throw an exception, saying that the
132 * underlying channel is closed (this can happen, e.g., if the other server
133 * sent a close message.) However, as long as you have not called the
134 * <code>close()</code> method, you are likely wasting resources.
135 */
136 @Override
137 public void close() {
138 sess.close();
139 }
140
141 /**
142 * Set the charset used to convert between Java Unicode Strings and byte encodings
143 * used by the server for paths and file names.
144 *
145 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8.
146 * @throws java.io.IOException
147 * @see #getCharset()
148 */
149 @Override
150 public void setCharset(String charset) throws IOException {
151 if(charset == null) {
152 this.charset = null;
153 return;
154 }
155 try {
156 Charset.forName(charset);
157 }
158 catch(UnsupportedCharsetException e) {
159 throw new IOException("This charset is not supported", e);
160 }
161 this.charset = charset;
162 }
163
164 /**
165 * The currently used charset for filename encoding/decoding.
166 *
167 * @return The name of the charset (<code>null</code> if UTF-8 is used).
168 * @see #setCharset(String)
169 */
170 @Override
171 public String getCharset() {
172 return charset;
173 }
174
175 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException;
176
177 @Override
178 public void mkdir(String dirName, int posixPermissions) throws IOException {
179 int req_id = generateNextRequestID();
180
181 TypesWriter tw = new TypesWriter();
182 tw.writeString(dirName, this.getCharset());
183 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS);
184 tw.writeUINT32(posixPermissions);
185
186 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes());
187
188 expectStatusOKMessage(req_id);
189 }
190
191 @Override
192 public void rm(String fileName) throws IOException {
193 int req_id = generateNextRequestID();
194
195 TypesWriter tw = new TypesWriter();
196 tw.writeString(fileName, this.getCharset());
197
198 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes());
199
200 expectStatusOKMessage(req_id);
201 }
202
203 @Override
204 public void rmdir(String dirName) throws IOException {
205 int req_id = generateNextRequestID();
206
207 TypesWriter tw = new TypesWriter();
208 tw.writeString(dirName, this.getCharset());
209
210 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes());
211
212 expectStatusOKMessage(req_id);
213 }
214
215 @Override
216 public void mv(String oldPath, String newPath) throws IOException {
217 int req_id = generateNextRequestID();
218
219 TypesWriter tw = new TypesWriter();
220 tw.writeString(oldPath, this.getCharset());
221 tw.writeString(newPath, this.getCharset());
222
223 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes());
224
225 expectStatusOKMessage(req_id);
226 }
227
228 @Override
229 public String readLink(String path) throws IOException {
230 int req_id = generateNextRequestID();
231
232 TypesWriter tw = new TypesWriter();
233 tw.writeString(path, charset);
234
235 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes());
236
237 byte[] resp = receiveMessage(34000);
238
239 TypesReader tr = new TypesReader(resp);
240
241 int t = tr.readByte();
242 listener.read(Packet.forName(t));
243
244 int rep_id = tr.readUINT32();
245 if(rep_id != req_id) {
246 throw new RequestMismatchException();
247 }
248
249 if(t == Packet.SSH_FXP_NAME) {
250 int count = tr.readUINT32();
251
252 if(count != 1) {
253 throw new PacketTypeException(t);
254 }
255
256 return tr.readString(charset);
257 }
258
259 if(t != Packet.SSH_FXP_STATUS) {
260 throw new PacketTypeException(t);
261 }
262
263 int errorCode = tr.readUINT32();
264 String errorMessage = tr.readString();
265 listener.read(errorMessage);
266 throw new SFTPException(errorMessage, errorCode);
267 }
268
269 @Override
270 public void setstat(String path, SFTPFileAttributes attr) throws IOException {
271 int req_id = generateNextRequestID();
272
273 TypesWriter tw = new TypesWriter();
274 tw.writeString(path, charset);
275 tw.writeBytes(attr.toBytes());
276
277 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes());
278
279 expectStatusOKMessage(req_id);
280 }
281
282 @Override
283 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException {
284 int req_id = generateNextRequestID();
285
286 TypesWriter tw = new TypesWriter();
287 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
288 tw.writeBytes(attr.toBytes());
289
290 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes());
291
292 expectStatusOKMessage(req_id);
293 }
294
295 @Override
296 public void createSymlink(String src, String target) throws IOException {
297 int req_id = generateNextRequestID();
298
299 TypesWriter tw = new TypesWriter();
300 tw.writeString(src, charset);
301 tw.writeString(target, charset);
302
303 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes());
304
305 expectStatusOKMessage(req_id);
306 }
307
308 @Override
309 public void createHardlink(String src, String target) throws IOException {
310 int req_id = generateNextRequestID();
311
312 TypesWriter tw = new TypesWriter();
313 tw.writeString("hardlink@openssh.com", charset);
314 tw.writeString(target, charset);
315 tw.writeString(src, charset);
316
317 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes());
318
319 expectStatusOKMessage(req_id);
320 }
321
322 @Override
323 public String canonicalPath(String path) throws IOException {
324 int req_id = generateNextRequestID();
325
326 TypesWriter tw = new TypesWriter();
327 tw.writeString(path, charset);
328
329 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes());
330
331 byte[] resp = receiveMessage(34000);
332
333 TypesReader tr = new TypesReader(resp);
334
335 int t = tr.readByte();
336 listener.read(Packet.forName(t));
337
338 int rep_id = tr.readUINT32();
339 if(rep_id != req_id) {
340 throw new RequestMismatchException();
341 }
342
343 if(t == Packet.SSH_FXP_NAME) {
344 int count = tr.readUINT32();
345
346 if(count != 1) {
347 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet.");
348 }
349 final String name = tr.readString(charset);
350 listener.read(name);
351 return name;
352 }
353
354 if(t != Packet.SSH_FXP_STATUS) {
355 throw new PacketTypeException(t);
356 }
357
358 int errorCode = tr.readUINT32();
359 String errorMessage = tr.readString();
360 listener.read(errorMessage);
361 throw new SFTPException(errorMessage, errorCode);
362 }
363
364 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException {
365 if(log.isDebugEnabled()) {
366 log.debug(String.format("Send message of type %d with request id %d", type, requestId));
367 }
368 listener.write(Packet.forName(type));
369
370 int msglen = len + 1;
371
372 if(type != Packet.SSH_FXP_INIT) {
373 msglen += 4;
374 }
375
376 os.write(msglen >> 24);
377 os.write(msglen >> 16);
378 os.write(msglen >> 8);
379 os.write(msglen);
380 os.write(type);
381
382 if(type != Packet.SSH_FXP_INIT) {
383 os.write(requestId >> 24);
384 os.write(requestId >> 16);
385 os.write(requestId >> 8);
386 os.write(requestId);
387 }
388
389 os.write(msg, off, len);
390 os.flush();
391 }
392
393 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException {
394 sendMessage(type, requestId, msg, 0, msg.length);
395 }
396
397 private void readBytes(byte[] buff, int pos, int len) throws IOException {
398 while(len > 0) {
399 int count = is.read(buff, pos, len);
400 if(count < 0) {
401 throw new SocketException("Unexpected end of stream.");
402 }
403 len -= count;
404 pos += count;
405 }
406 }
407
408 /**
409 * Read a message and guarantee that the <b>contents</b> is not larger than
410 * <code>maxlen</code> bytes.
411 * <p/>
412 * Note: receiveMessage(34000) actually means that the message may be up to 34004
413 * bytes (the length attribute preceding the contents is 4 bytes).
414 *
415 * @param maxlen
416 * @return the message contents
417 * @throws IOException
418 */
419 protected byte[] receiveMessage(int maxlen) throws IOException {
420 byte[] msglen = new byte[4];
421
422 readBytes(msglen, 0, 4);
423
424 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff));
425
426 if((len > maxlen) || (len <= 0)) {
427 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len));
428 }
429
430 byte[] msg = new byte[len];
431
432 readBytes(msg, 0, len);
433
434 return msg;
435 }
436
437 protected int generateNextRequestID() {
438 synchronized(this) {
439 return next_request_id++;
440 }
441 }
442
443 protected void closeHandle(byte[] handle) throws IOException {
444 int req_id = generateNextRequestID();
445
446 TypesWriter tw = new TypesWriter();
447 tw.writeString(handle, 0, handle.length);
448
449 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes());
450
451 expectStatusOKMessage(req_id);
452 }
453
454 private void readStatus() throws IOException {
455 byte[] resp = receiveMessage(34000);
456
457 TypesReader tr = new TypesReader(resp);
458 int t = tr.readByte();
459 listener.read(Packet.forName(t));
460
461 // Search the pending queue
462 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
463 if(null == status) {
464 throw new RequestMismatchException();
465 }
466
467 // Evaluate the answer
468 if(t == Packet.SSH_FXP_STATUS) {
469 // In any case, stop sending more packets
470 int code = tr.readUINT32();
471 if(log.isDebugEnabled()) {
472 String[] desc = ErrorCodes.getDescription(code);
473 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
474 }
475 if(code == ErrorCodes.SSH_FX_OK) {
476 return;
477 }
478 String msg = tr.readString();
479 listener.read(msg);
480 throw new SFTPException(msg, code);
481 }
482 throw new PacketTypeException(t);
483 }
484
485 private void readPendingReadStatus() throws IOException {
486 byte[] resp = receiveMessage(34000);
487
488 TypesReader tr = new TypesReader(resp);
489 int t = tr.readByte();
490 listener.read(Packet.forName(t));
491
492 // Search the pending queue
493 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32());
494 if(null == status) {
495 throw new RequestMismatchException();
496 }
497
498 // Evaluate the answer
499 if(t == Packet.SSH_FXP_STATUS) {
500 // In any case, stop sending more packets
501 int code = tr.readUINT32();
502 if(log.isDebugEnabled()) {
503 String[] desc = ErrorCodes.getDescription(code);
504 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
505 }
506 if(code == ErrorCodes.SSH_FX_OK) {
507 return;
508 }
509 if(code == ErrorCodes.SSH_FX_EOF) {
510 return;
511 }
512 String msg = tr.readString();
513 listener.read(msg);
514 throw new SFTPException(msg, code);
515 }
516 throw new PacketTypeException(t);
517 }
518
519 protected void expectStatusOKMessage(int id) throws IOException {
520 byte[] resp = receiveMessage(34000);
521
522 TypesReader tr = new TypesReader(resp);
523
524 int t = tr.readByte();
525 listener.read(Packet.forName(t));
526
527 int rep_id = tr.readUINT32();
528 if(rep_id != id) {
529 throw new RequestMismatchException();
530 }
531
532 if(t != Packet.SSH_FXP_STATUS) {
533 throw new PacketTypeException(t);
534 }
535
536 int errorCode = tr.readUINT32();
537
538 if(errorCode == ErrorCodes.SSH_FX_OK) {
539 return;
540 }
541 String errorMessage = tr.readString();
542 listener.read(errorMessage);
543 throw new SFTPException(errorMessage, errorCode);
544 }
545
546 @Override
547 public void closeFile(SFTPFileHandle handle) throws IOException {
548 while(!pendingReadQueue.isEmpty()) {
549 this.readPendingReadStatus();
550 }
551 while(!pendingStatusQueue.isEmpty()) {
552 this.readStatus();
553 }
554 closeHandle(handle.getHandle());
555 }
556
557 @Override
558 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException {
559 boolean errorOccured = false;
560 int remaining = len * parallelism;
561 //int clientOffset = dstoff;
562
563 long serverOffset = fileOffset;
564 for(OutstandingReadRequest r : pendingReadQueue.values()) {
565 // Server offset should take pending requests into account.
566 serverOffset += r.len;
567 }
568
569 while(true) {
570 // Stop if there was an error and no outstanding request
571 if((pendingReadQueue.size() == 0) && errorOccured) {
572 break;
573 }
574
575 // Send as many requests as we are allowed to
576 while(pendingReadQueue.size() < parallelism) {
577 if(errorOccured) {
578 break;
579 }
580 // Send the next read request
581 OutstandingReadRequest req = new OutstandingReadRequest();
582 req.req_id = generateNextRequestID();
583 req.serverOffset = serverOffset;
584 req.len = (remaining > len) ? len : remaining;
585 req.buffer = dst;
586 req.dstOffset = dstoff;
587
588 serverOffset += req.len;
589 //clientOffset += req.len;
590 remaining -= req.len;
591
592 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
593
594 pendingReadQueue.put(req.req_id, req);
595 }
596 if(pendingReadQueue.size() == 0) {
597 break;
598 }
599
600 // Receive a single answer
601 byte[] resp = receiveMessage(34000);
602 TypesReader tr = new TypesReader(resp);
603
604 int t = tr.readByte();
605 listener.read(Packet.forName(t));
606
607 // Search the pending queue
608 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
609 if(null == req) {
610 throw new RequestMismatchException();
611 }
612 // Evaluate the answer
613 if(t == Packet.SSH_FXP_STATUS) {
614 /* In any case, stop sending more packets */
615
616 int code = tr.readUINT32();
617 String msg = tr.readString();
618 listener.read(msg);
619
620 if(log.isDebugEnabled()) {
621 String[] desc = ErrorCodes.getDescription(code);
622 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
623 }
624 // Flag to read all pending requests but don't send any more.
625 errorOccured = true;
626 if(pendingReadQueue.isEmpty()) {
627 if(ErrorCodes.SSH_FX_EOF == code) {
628 return -1;
629 }
630 throw new SFTPException(msg, code);
631 }
632 }
633 else if(t == Packet.SSH_FXP_DATA) {
634 // OK, collect data
635 int readLen = tr.readUINT32();
636
637 if((readLen < 0) || (readLen > req.len)) {
638 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet.");
639 }
640
641 if(log.isDebugEnabled()) {
642 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen
643 + " (requested: " + req.len + ")");
644 }
645
646 // Read bytes into buffer
647 tr.readBytes(req.buffer, req.dstOffset, readLen);
648
649 if(readLen < req.len) {
650 /* Send this request packet again to request the remaing data in this slot. */
651 req.req_id = generateNextRequestID();
652 req.serverOffset += readLen;
653 req.len -= readLen;
654
655 log.debug("Requesting again: " + req.serverOffset + "/" + req.len);
656 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
657
658 pendingReadQueue.put(req.req_id, req);
659 }
660 return readLen;
661 }
662 else {
663 throw new PacketTypeException(t);
664 }
665 }
666 // Should never reach here.
667 throw new SFTPException("No EOF reached", -1);
668 }
669
670 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException {
671 TypesWriter tw = new TypesWriter();
672 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
673 tw.writeUINT64(offset);
674 tw.writeUINT32(len);
675
676 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes());
677 }
678
679 @Override
680 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
681 while(len > 0) {
682 int writeRequestLen = len;
683
684 if(writeRequestLen > 32768) {
685 writeRequestLen = 32768;
686 }
687
688 // Send the next write request
689 OutstandingStatusRequest req = new OutstandingStatusRequest();
690 req.req_id = generateNextRequestID();
691
692 TypesWriter tw = new TypesWriter();
693 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
694 tw.writeUINT64(fileOffset);
695 tw.writeString(src, srcoff, writeRequestLen);
696
697 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
698
699 pendingStatusQueue.put(req.req_id, req);
700
701 // Only read next status if parallelism reached
702 while(pendingStatusQueue.size() >= parallelism) {
703 this.readStatus();
704 }
705 fileOffset += writeRequestLen;
706 srcoff += writeRequestLen;
707 len -= writeRequestLen;
708 }
709 }
710
711
712 /**
713 * A read is divided into multiple requests sent sequentially before
714 * reading any status from the server
715 */
716 private static class OutstandingReadRequest {
717 int req_id;
718 /**
719 * Read offset to request on server starting at the file offset for the first request.
720 */
721 long serverOffset;
722 /**
723 * Length of requested data
724 */
725 int len;
726 /**
727 * Offset in destination buffer
728 */
729 int dstOffset;
730 /**
731 * Temporary buffer
732 */
733 byte[] buffer;
734 }
735
736 /**
737 * A read is divided into multiple requests sent sequentially before
738 * reading any status from the server
739 */
740 private static final class OutstandingStatusRequest {
741 int req_id;
742 }
743
744 }