comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 342:175c7d68f3c4

merge ganymed into mainline
author Carl Byington <carl@five-ten-sg.com>
date Thu, 31 Jul 2014 16:33:38 -0700
parents 071eccdff8ea
children
comparison
equal deleted inserted replaced
272:ce2f4e397703 342:175c7d68f3c4
1 package ch.ethz.ssh2;
2
3 import java.io.BufferedOutputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.net.SocketException;
8 import java.nio.charset.Charset;
9 import java.nio.charset.UnsupportedCharsetException;
10 import java.util.HashMap;
11 import java.util.Map;
12
13 import ch.ethz.ssh2.channel.Channel;
14 import ch.ethz.ssh2.log.Logger;
15 import ch.ethz.ssh2.packets.TypesReader;
16 import ch.ethz.ssh2.packets.TypesWriter;
17 import ch.ethz.ssh2.sftp.AttribFlags;
18 import ch.ethz.ssh2.sftp.ErrorCodes;
19 import ch.ethz.ssh2.sftp.Packet;
20 import ch.ethz.ssh2.util.StringEncoder;
21
22 /**
23 * @version $Id: AbstractSFTPClient.java 151 2014-04-28 10:03:39Z dkocher@sudo.ch $
24 */
25 public abstract class AbstractSFTPClient implements SFTPClient {
26
27 private static final Logger log = Logger.getLogger(SFTPv3Client.class);
28
29 private Session sess;
30
31 private InputStream is;
32 private OutputStream os;
33
34 private int next_request_id = 1000;
35
36 private String charset;
37
38 /**
39 * Parallel read requests maximum size.
40 */
41 private static final int DEFAULT_MAX_PARALLELISM = 64;
42
43 /**
44 * Parallel read requests.
45 */
46 private int parallelism = DEFAULT_MAX_PARALLELISM;
47
48 public void setRequestParallelism(int parallelism) {
49 this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM);
50 }
51
52 /**
53 * Mapping request ID to request.
54 */
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue
56 = new HashMap<Integer, OutstandingReadRequest>();
57
58 /**
59 * Mapping request ID to request.
60 */
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue
62 = new HashMap<Integer, OutstandingStatusRequest>();
63
64 private PacketListener listener;
65
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException {
67 this.listener = listener;
68 log.debug("Opening session and starting SFTP subsystem.");
69 sess = conn.openSession();
70 sess.startSubSystem("sftp");
71 is = sess.getStdout();
72 os = new BufferedOutputStream(sess.getStdin(), 2048);
73 init(version);
74 }
75
76 private void init(final int client_version) throws IOException {
77 // Send SSH_FXP_INIT with client version
78 TypesWriter tw = new TypesWriter();
79 tw.writeUINT32(client_version);
80 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes());
81 /* Receive SSH_FXP_VERSION */
82 log.debug("Waiting for SSH_FXP_VERSION...");
83 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */
84 int t = tr.readByte();
85 listener.read(Packet.forName(t));
86
87 if (t != Packet.SSH_FXP_VERSION) {
88 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t));
89 throw new PacketTypeException(t);
90 }
91
92 final int protocol_version = tr.readUINT32();
93 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
94
95 if (protocol_version != client_version) {
96 throw new IOException(String.format("Server protocol version %d does not match %d",
97 protocol_version, client_version));
98 }
99
100 // Both parties should from then on adhere to particular version of the protocol
101
102 // Read and save extensions (if any) for later use
103 while (tr.remain() != 0) {
104 String name = tr.readString();
105 listener.read(name);
106 byte[] value = tr.readByteString();
107 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value)));
108 }
109 }
110
111 /**
112 * Queries the channel state
113 *
114 * @return True if the underlying session is in open state
115 */
116 public boolean isConnected() {
117 return sess.getState() == Channel.STATE_OPEN;
118 }
119
120 /**
121 * Close this SFTP session. NEVER forget to call this method to free up
122 * resources - even if you got an exception from one of the other methods.
123 * Sometimes these other methods may throw an exception, saying that the
124 * underlying channel is closed (this can happen, e.g., if the other server
125 * sent a close message.) However, as long as you have not called the
126 * <code>close()</code> method, you are likely wasting resources.
127 */
128 public void close() {
129 sess.close();
130 }
131
132 /**
133 * Set the charset used to convert between Java Unicode Strings and byte encodings
134 * used by the server for paths and file names.
135 *
136 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8.
137 * @throws java.io.IOException
138 * @see #getCharset()
139 */
140 public void setCharset(String charset) throws IOException {
141 if (charset == null) {
142 this.charset = null;
143 return;
144 }
145
146 try {
147 Charset.forName(charset);
148 }
149 catch (UnsupportedCharsetException e) {
150 throw new IOException("This charset is not supported", e);
151 }
152
153 this.charset = charset;
154 }
155
156 /**
157 * The currently used charset for filename encoding/decoding.
158 *
159 * @return The name of the charset (<code>null</code> if UTF-8 is used).
160 * @see #setCharset(String)
161 */
162 public String getCharset() {
163 return charset;
164 }
165
166 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException;
167
168 public void mkdir(String dirName, int posixPermissions) throws IOException {
169 int req_id = generateNextRequestID();
170 TypesWriter tw = new TypesWriter();
171 tw.writeString(dirName, this.getCharset());
172 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS);
173 tw.writeUINT32(posixPermissions);
174 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes());
175 expectStatusOKMessage(req_id);
176 }
177
178 public void rm(String fileName) throws IOException {
179 int req_id = generateNextRequestID();
180 TypesWriter tw = new TypesWriter();
181 tw.writeString(fileName, this.getCharset());
182 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes());
183 expectStatusOKMessage(req_id);
184 }
185
186 public void rmdir(String dirName) throws IOException {
187 int req_id = generateNextRequestID();
188 TypesWriter tw = new TypesWriter();
189 tw.writeString(dirName, this.getCharset());
190 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes());
191 expectStatusOKMessage(req_id);
192 }
193
194 public void mv(String oldPath, String newPath) throws IOException {
195 int req_id = generateNextRequestID();
196 TypesWriter tw = new TypesWriter();
197 tw.writeString(oldPath, this.getCharset());
198 tw.writeString(newPath, this.getCharset());
199 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes());
200 expectStatusOKMessage(req_id);
201 }
202
203 public String readLink(String path) throws IOException {
204 int req_id = generateNextRequestID();
205 TypesWriter tw = new TypesWriter();
206 tw.writeString(path, charset);
207 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes());
208 byte[] resp = receiveMessage(34000);
209 TypesReader tr = new TypesReader(resp);
210 int t = tr.readByte();
211 listener.read(Packet.forName(t));
212 int rep_id = tr.readUINT32();
213
214 if (rep_id != req_id) {
215 throw new RequestMismatchException();
216 }
217
218 if (t == Packet.SSH_FXP_NAME) {
219 int count = tr.readUINT32();
220
221 if (count != 1) {
222 throw new PacketTypeException(t);
223 }
224
225 return tr.readString(charset);
226 }
227
228 if (t != Packet.SSH_FXP_STATUS) {
229 throw new PacketTypeException(t);
230 }
231
232 int errorCode = tr.readUINT32();
233 String errorMessage = tr.readString();
234 listener.read(errorMessage);
235 throw new SFTPException(errorMessage, errorCode);
236 }
237
238 public void setstat(String path, SFTPFileAttributes attr) throws IOException {
239 int req_id = generateNextRequestID();
240 TypesWriter tw = new TypesWriter();
241 tw.writeString(path, charset);
242 tw.writeBytes(attr.toBytes());
243 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes());
244 expectStatusOKMessage(req_id);
245 }
246
247 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException {
248 int req_id = generateNextRequestID();
249 TypesWriter tw = new TypesWriter();
250 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
251 tw.writeBytes(attr.toBytes());
252 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes());
253 expectStatusOKMessage(req_id);
254 }
255
256 public void createSymlink(String src, String target) throws IOException {
257 int req_id = generateNextRequestID();
258 TypesWriter tw = new TypesWriter();
259 tw.writeString(src, charset);
260 tw.writeString(target, charset);
261 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes());
262 expectStatusOKMessage(req_id);
263 }
264
265 public void createHardlink(String src, String target) throws IOException {
266 int req_id = generateNextRequestID();
267 TypesWriter tw = new TypesWriter();
268 tw.writeString("hardlink@openssh.com", charset);
269 tw.writeString(target, charset);
270 tw.writeString(src, charset);
271 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes());
272 expectStatusOKMessage(req_id);
273 }
274
275 public String canonicalPath(String path) throws IOException {
276 int req_id = generateNextRequestID();
277 TypesWriter tw = new TypesWriter();
278 tw.writeString(path, charset);
279 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes());
280 byte[] resp = receiveMessage(34000);
281 TypesReader tr = new TypesReader(resp);
282 int t = tr.readByte();
283 listener.read(Packet.forName(t));
284 int rep_id = tr.readUINT32();
285
286 if (rep_id != req_id) {
287 throw new RequestMismatchException();
288 }
289
290 if (t == Packet.SSH_FXP_NAME) {
291 int count = tr.readUINT32();
292
293 if (count != 1) {
294 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet.");
295 }
296
297 final String name = tr.readString(charset);
298 listener.read(name);
299 return name;
300 }
301
302 if (t != Packet.SSH_FXP_STATUS) {
303 throw new PacketTypeException(t);
304 }
305
306 int errorCode = tr.readUINT32();
307 String errorMessage = tr.readString();
308 listener.read(errorMessage);
309 throw new SFTPException(errorMessage, errorCode);
310 }
311
312 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException {
313 if (log.isDebugEnabled()) {
314 log.debug(String.format("Send message of type %d with request id %d", type, requestId));
315 }
316
317 listener.write(Packet.forName(type));
318 int msglen = len + 1;
319
320 if (type != Packet.SSH_FXP_INIT) {
321 msglen += 4;
322 }
323
324 os.write(msglen >> 24);
325 os.write(msglen >> 16);
326 os.write(msglen >> 8);
327 os.write(msglen);
328 os.write(type);
329
330 if (type != Packet.SSH_FXP_INIT) {
331 os.write(requestId >> 24);
332 os.write(requestId >> 16);
333 os.write(requestId >> 8);
334 os.write(requestId);
335 }
336
337 os.write(msg, off, len);
338 os.flush();
339 }
340
341 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException {
342 sendMessage(type, requestId, msg, 0, msg.length);
343 }
344
345 private void readBytes(byte[] buff, int pos, int len) throws IOException {
346 while (len > 0) {
347 int count = is.read(buff, pos, len);
348
349 if (count < 0) {
350 throw new SocketException("Unexpected end of stream.");
351 }
352
353 len -= count;
354 pos += count;
355 }
356 }
357
358 /**
359 * Read a message and guarantee that the <b>contents</b> is not larger than
360 * <code>maxlen</code> bytes.
361 * <p/>
362 * Note: receiveMessage(34000) actually means that the message may be up to 34004
363 * bytes (the length attribute preceding the contents is 4 bytes).
364 *
365 * @param maxlen
366 * @return the message contents
367 * @throws IOException
368 */
369 protected byte[] receiveMessage(int maxlen) throws IOException {
370 byte[] msglen = new byte[4];
371 readBytes(msglen, 0, 4);
372 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff));
373
374 if ((len > maxlen) || (len <= 0)) {
375 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len));
376 }
377
378 byte[] msg = new byte[len];
379 readBytes(msg, 0, len);
380 return msg;
381 }
382
383 protected int generateNextRequestID() {
384 synchronized (this) {
385 return next_request_id++;
386 }
387 }
388
389 protected void closeHandle(byte[] handle) throws IOException {
390 int req_id = generateNextRequestID();
391 TypesWriter tw = new TypesWriter();
392 tw.writeString(handle, 0, handle.length);
393 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes());
394 expectStatusOKMessage(req_id);
395 }
396
397 private void readStatus() throws IOException {
398 byte[] resp = receiveMessage(34000);
399 TypesReader tr = new TypesReader(resp);
400 int t = tr.readByte();
401 listener.read(Packet.forName(t));
402 // Search the pending queue
403 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
404
405 if (null == status) {
406 throw new RequestMismatchException();
407 }
408
409 // Evaluate the answer
410 if (t == Packet.SSH_FXP_STATUS) {
411 // In any case, stop sending more packets
412 int code = tr.readUINT32();
413
414 if (log.isDebugEnabled()) {
415 String[] desc = ErrorCodes.getDescription(code);
416 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
417 }
418
419 if (code == ErrorCodes.SSH_FX_OK) {
420 return;
421 }
422
423 String msg = tr.readString();
424 listener.read(msg);
425 throw new SFTPException(msg, code);
426 }
427
428 throw new PacketTypeException(t);
429 }
430
431 private void readPendingReadStatus() throws IOException {
432 byte[] resp = receiveMessage(34000);
433 TypesReader tr = new TypesReader(resp);
434 int t = tr.readByte();
435 listener.read(Packet.forName(t));
436 // Search the pending queue
437 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32());
438
439 if (null == status) {
440 throw new RequestMismatchException();
441 }
442
443 // Evaluate the answer
444 if (t == Packet.SSH_FXP_STATUS) {
445 // In any case, stop sending more packets
446 int code = tr.readUINT32();
447
448 if (log.isDebugEnabled()) {
449 String[] desc = ErrorCodes.getDescription(code);
450 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
451 }
452
453 if (code == ErrorCodes.SSH_FX_OK) {
454 return;
455 }
456
457 if (code == ErrorCodes.SSH_FX_EOF) {
458 return;
459 }
460
461 String msg = tr.readString();
462 listener.read(msg);
463 throw new SFTPException(msg, code);
464 }
465
466 throw new PacketTypeException(t);
467 }
468
469 protected void expectStatusOKMessage(int id) throws IOException {
470 byte[] resp = receiveMessage(34000);
471 TypesReader tr = new TypesReader(resp);
472 int t = tr.readByte();
473 listener.read(Packet.forName(t));
474 int rep_id = tr.readUINT32();
475
476 if (rep_id != id) {
477 throw new RequestMismatchException();
478 }
479
480 if (t != Packet.SSH_FXP_STATUS) {
481 throw new PacketTypeException(t);
482 }
483
484 int errorCode = tr.readUINT32();
485
486 if (errorCode == ErrorCodes.SSH_FX_OK) {
487 return;
488 }
489
490 String errorMessage = tr.readString();
491 listener.read(errorMessage);
492 throw new SFTPException(errorMessage, errorCode);
493 }
494
495 public void closeFile(SFTPFileHandle handle) throws IOException {
496 while (!pendingReadQueue.isEmpty()) {
497 this.readPendingReadStatus();
498 }
499
500 while (!pendingStatusQueue.isEmpty()) {
501 this.readStatus();
502 }
503
504 closeHandle(handle.getHandle());
505 }
506
507 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException {
508 boolean errorOccured = false;
509 int remaining = len * parallelism;
510 //int clientOffset = dstoff;
511 long serverOffset = fileOffset;
512
513 for (OutstandingReadRequest r : pendingReadQueue.values()) {
514 // Server offset should take pending requests into account.
515 serverOffset += r.len;
516 }
517
518 while (true) {
519 // Stop if there was an error and no outstanding request
520 if ((pendingReadQueue.size() == 0) && errorOccured) {
521 break;
522 }
523
524 // Send as many requests as we are allowed to
525 while (pendingReadQueue.size() < parallelism) {
526 if (errorOccured) {
527 break;
528 }
529
530 // Send the next read request
531 OutstandingReadRequest req = new OutstandingReadRequest();
532 req.req_id = generateNextRequestID();
533 req.serverOffset = serverOffset;
534 req.len = (remaining > len) ? len : remaining;
535 req.buffer = dst;
536 req.dstOffset = dstoff;
537 serverOffset += req.len;
538 //clientOffset += req.len;
539 remaining -= req.len;
540 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
541 pendingReadQueue.put(req.req_id, req);
542 }
543
544 if (pendingReadQueue.size() == 0) {
545 break;
546 }
547
548 // Receive a single answer
549 byte[] resp = receiveMessage(34000);
550 TypesReader tr = new TypesReader(resp);
551 int t = tr.readByte();
552 listener.read(Packet.forName(t));
553 // Search the pending queue
554 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
555
556 if (null == req) {
557 throw new RequestMismatchException();
558 }
559
560 // Evaluate the answer
561 if (t == Packet.SSH_FXP_STATUS) {
562 /* In any case, stop sending more packets */
563 int code = tr.readUINT32();
564 String msg = tr.readString();
565 listener.read(msg);
566
567 if (log.isDebugEnabled()) {
568 String[] desc = ErrorCodes.getDescription(code);
569 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
570 }
571
572 // Flag to read all pending requests but don't send any more.
573 errorOccured = true;
574
575 if (pendingReadQueue.isEmpty()) {
576 if (ErrorCodes.SSH_FX_EOF == code) {
577 return -1;
578 }
579
580 throw new SFTPException(msg, code);
581 }
582 }
583 else if (t == Packet.SSH_FXP_DATA) {
584 // OK, collect data
585 int readLen = tr.readUINT32();
586
587 if ((readLen < 0) || (readLen > req.len)) {
588 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet.");
589 }
590
591 if (log.isDebugEnabled()) {
592 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen
593 + " (requested: " + req.len + ")");
594 }
595
596 // Read bytes into buffer
597 tr.readBytes(req.buffer, req.dstOffset, readLen);
598
599 if (readLen < req.len) {
600 /* Send this request packet again to request the remaing data in this slot. */
601 req.req_id = generateNextRequestID();
602 req.serverOffset += readLen;
603 req.len -= readLen;
604 log.debug("Requesting again: " + req.serverOffset + "/" + req.len);
605 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
606 pendingReadQueue.put(req.req_id, req);
607 }
608
609 return readLen;
610 }
611 else {
612 throw new PacketTypeException(t);
613 }
614 }
615
616 // Should never reach here.
617 throw new SFTPException("No EOF reached", -1);
618 }
619
620 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException {
621 TypesWriter tw = new TypesWriter();
622 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
623 tw.writeUINT64(offset);
624 tw.writeUINT32(len);
625 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes());
626 }
627
628 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
629 while (len > 0) {
630 int writeRequestLen = len;
631
632 if (writeRequestLen > 32768) {
633 writeRequestLen = 32768;
634 }
635
636 // Send the next write request
637 OutstandingStatusRequest req = new OutstandingStatusRequest();
638 req.req_id = generateNextRequestID();
639 TypesWriter tw = new TypesWriter();
640 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
641 tw.writeUINT64(fileOffset);
642 tw.writeString(src, srcoff, writeRequestLen);
643 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
644 pendingStatusQueue.put(req.req_id, req);
645
646 // Only read next status if parallelism reached
647 while (pendingStatusQueue.size() >= parallelism) {
648 this.readStatus();
649 }
650
651 fileOffset += writeRequestLen;
652 srcoff += writeRequestLen;
653 len -= writeRequestLen;
654 }
655 }
656
657
658 /**
659 * A read is divided into multiple requests sent sequentially before
660 * reading any status from the server
661 */
662 private static class OutstandingReadRequest {
663 int req_id;
664 /**
665 * Read offset to request on server starting at the file offset for the first request.
666 */
667 long serverOffset;
668 /**
669 * Length of requested data
670 */
671 int len;
672 /**
673 * Offset in destination buffer
674 */
675 int dstOffset;
676 /**
677 * Temporary buffer
678 */
679 byte[] buffer;
680 }
681
682 /**
683 * A read is divided into multiple requests sent sequentially before
684 * reading any status from the server
685 */
686 private static final class OutstandingStatusRequest {
687 int req_id;
688 }
689
690 }