Mercurial > 510Connectbot
comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 273:91a31873c42a ganymed
start conversion from trilead to ganymed
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Fri, 18 Jul 2014 11:21:46 -0700 |
parents | |
children | d2b303406d63 |
comparison
equal
deleted
inserted
replaced
272:ce2f4e397703 | 273:91a31873c42a |
---|---|
1 package ch.ethz.ssh2; | |
2 | |
3 import java.io.BufferedOutputStream; | |
4 import java.io.IOException; | |
5 import java.io.InputStream; | |
6 import java.io.OutputStream; | |
7 import java.net.SocketException; | |
8 import java.nio.charset.Charset; | |
9 import java.nio.charset.UnsupportedCharsetException; | |
10 import java.util.HashMap; | |
11 import java.util.Map; | |
12 | |
13 import ch.ethz.ssh2.channel.Channel; | |
14 import ch.ethz.ssh2.log.Logger; | |
15 import ch.ethz.ssh2.packets.TypesReader; | |
16 import ch.ethz.ssh2.packets.TypesWriter; | |
17 import ch.ethz.ssh2.sftp.AttribFlags; | |
18 import ch.ethz.ssh2.sftp.ErrorCodes; | |
19 import ch.ethz.ssh2.sftp.Packet; | |
20 import ch.ethz.ssh2.util.StringEncoder; | |
21 | |
22 /** | |
23 * @version $Id: AbstractSFTPClient.java 151 2014-04-28 10:03:39Z dkocher@sudo.ch $ | |
24 */ | |
25 public abstract class AbstractSFTPClient implements SFTPClient { | |
26 | |
27 private static final Logger log = Logger.getLogger(SFTPv3Client.class); | |
28 | |
29 private Session sess; | |
30 | |
31 private InputStream is; | |
32 private OutputStream os; | |
33 | |
34 private int next_request_id = 1000; | |
35 | |
36 private String charset; | |
37 | |
38 /** | |
39 * Parallel read requests maximum size. | |
40 */ | |
41 private static final int DEFAULT_MAX_PARALLELISM = 64; | |
42 | |
43 /** | |
44 * Parallel read requests. | |
45 */ | |
46 private int parallelism = DEFAULT_MAX_PARALLELISM; | |
47 | |
48 public void setRequestParallelism(int parallelism) { | |
49 this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM); | |
50 } | |
51 | |
52 /** | |
53 * Mapping request ID to request. | |
54 */ | |
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue | |
56 = new HashMap<Integer, OutstandingReadRequest>(); | |
57 | |
58 /** | |
59 * Mapping request ID to request. | |
60 */ | |
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue | |
62 = new HashMap<Integer, OutstandingStatusRequest>(); | |
63 | |
64 private PacketListener listener; | |
65 | |
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException { | |
67 this.listener = listener; | |
68 | |
69 log.debug("Opening session and starting SFTP subsystem."); | |
70 sess = conn.openSession(); | |
71 sess.startSubSystem("sftp"); | |
72 | |
73 is = sess.getStdout(); | |
74 os = new BufferedOutputStream(sess.getStdin(), 2048); | |
75 | |
76 init(version); | |
77 | |
78 } | |
79 | |
80 private void init(final int client_version) throws IOException { | |
81 // Send SSH_FXP_INIT with client version | |
82 | |
83 TypesWriter tw = new TypesWriter(); | |
84 tw.writeUINT32(client_version); | |
85 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes()); | |
86 | |
87 /* Receive SSH_FXP_VERSION */ | |
88 | |
89 log.debug("Waiting for SSH_FXP_VERSION..."); | |
90 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */ | |
91 | |
92 int t = tr.readByte(); | |
93 listener.read(Packet.forName(t)); | |
94 | |
95 if(t != Packet.SSH_FXP_VERSION) { | |
96 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t)); | |
97 throw new PacketTypeException(t); | |
98 } | |
99 | |
100 final int protocol_version = tr.readUINT32(); | |
101 | |
102 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version); | |
103 if(protocol_version != client_version) { | |
104 throw new IOException(String.format("Server protocol version %d does not match %d", | |
105 protocol_version, client_version)); | |
106 } | |
107 // Both parties should from then on adhere to particular version of the protocol | |
108 | |
109 // Read and save extensions (if any) for later use | |
110 while(tr.remain() != 0) { | |
111 String name = tr.readString(); | |
112 listener.read(name); | |
113 byte[] value = tr.readByteString(); | |
114 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value))); | |
115 } | |
116 } | |
117 | |
118 /** | |
119 * Queries the channel state | |
120 * | |
121 * @return True if the underlying session is in open state | |
122 */ | |
123 @Override | |
124 public boolean isConnected() { | |
125 return sess.getState() == Channel.STATE_OPEN; | |
126 } | |
127 | |
128 /** | |
129 * Close this SFTP session. NEVER forget to call this method to free up | |
130 * resources - even if you got an exception from one of the other methods. | |
131 * Sometimes these other methods may throw an exception, saying that the | |
132 * underlying channel is closed (this can happen, e.g., if the other server | |
133 * sent a close message.) However, as long as you have not called the | |
134 * <code>close()</code> method, you are likely wasting resources. | |
135 */ | |
136 @Override | |
137 public void close() { | |
138 sess.close(); | |
139 } | |
140 | |
141 /** | |
142 * Set the charset used to convert between Java Unicode Strings and byte encodings | |
143 * used by the server for paths and file names. | |
144 * | |
145 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8. | |
146 * @throws java.io.IOException | |
147 * @see #getCharset() | |
148 */ | |
149 @Override | |
150 public void setCharset(String charset) throws IOException { | |
151 if(charset == null) { | |
152 this.charset = null; | |
153 return; | |
154 } | |
155 try { | |
156 Charset.forName(charset); | |
157 } | |
158 catch(UnsupportedCharsetException e) { | |
159 throw new IOException("This charset is not supported", e); | |
160 } | |
161 this.charset = charset; | |
162 } | |
163 | |
164 /** | |
165 * The currently used charset for filename encoding/decoding. | |
166 * | |
167 * @return The name of the charset (<code>null</code> if UTF-8 is used). | |
168 * @see #setCharset(String) | |
169 */ | |
170 @Override | |
171 public String getCharset() { | |
172 return charset; | |
173 } | |
174 | |
175 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException; | |
176 | |
177 @Override | |
178 public void mkdir(String dirName, int posixPermissions) throws IOException { | |
179 int req_id = generateNextRequestID(); | |
180 | |
181 TypesWriter tw = new TypesWriter(); | |
182 tw.writeString(dirName, this.getCharset()); | |
183 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS); | |
184 tw.writeUINT32(posixPermissions); | |
185 | |
186 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes()); | |
187 | |
188 expectStatusOKMessage(req_id); | |
189 } | |
190 | |
191 @Override | |
192 public void rm(String fileName) throws IOException { | |
193 int req_id = generateNextRequestID(); | |
194 | |
195 TypesWriter tw = new TypesWriter(); | |
196 tw.writeString(fileName, this.getCharset()); | |
197 | |
198 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes()); | |
199 | |
200 expectStatusOKMessage(req_id); | |
201 } | |
202 | |
203 @Override | |
204 public void rmdir(String dirName) throws IOException { | |
205 int req_id = generateNextRequestID(); | |
206 | |
207 TypesWriter tw = new TypesWriter(); | |
208 tw.writeString(dirName, this.getCharset()); | |
209 | |
210 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes()); | |
211 | |
212 expectStatusOKMessage(req_id); | |
213 } | |
214 | |
215 @Override | |
216 public void mv(String oldPath, String newPath) throws IOException { | |
217 int req_id = generateNextRequestID(); | |
218 | |
219 TypesWriter tw = new TypesWriter(); | |
220 tw.writeString(oldPath, this.getCharset()); | |
221 tw.writeString(newPath, this.getCharset()); | |
222 | |
223 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes()); | |
224 | |
225 expectStatusOKMessage(req_id); | |
226 } | |
227 | |
228 @Override | |
229 public String readLink(String path) throws IOException { | |
230 int req_id = generateNextRequestID(); | |
231 | |
232 TypesWriter tw = new TypesWriter(); | |
233 tw.writeString(path, charset); | |
234 | |
235 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes()); | |
236 | |
237 byte[] resp = receiveMessage(34000); | |
238 | |
239 TypesReader tr = new TypesReader(resp); | |
240 | |
241 int t = tr.readByte(); | |
242 listener.read(Packet.forName(t)); | |
243 | |
244 int rep_id = tr.readUINT32(); | |
245 if(rep_id != req_id) { | |
246 throw new RequestMismatchException(); | |
247 } | |
248 | |
249 if(t == Packet.SSH_FXP_NAME) { | |
250 int count = tr.readUINT32(); | |
251 | |
252 if(count != 1) { | |
253 throw new PacketTypeException(t); | |
254 } | |
255 | |
256 return tr.readString(charset); | |
257 } | |
258 | |
259 if(t != Packet.SSH_FXP_STATUS) { | |
260 throw new PacketTypeException(t); | |
261 } | |
262 | |
263 int errorCode = tr.readUINT32(); | |
264 String errorMessage = tr.readString(); | |
265 listener.read(errorMessage); | |
266 throw new SFTPException(errorMessage, errorCode); | |
267 } | |
268 | |
269 @Override | |
270 public void setstat(String path, SFTPFileAttributes attr) throws IOException { | |
271 int req_id = generateNextRequestID(); | |
272 | |
273 TypesWriter tw = new TypesWriter(); | |
274 tw.writeString(path, charset); | |
275 tw.writeBytes(attr.toBytes()); | |
276 | |
277 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes()); | |
278 | |
279 expectStatusOKMessage(req_id); | |
280 } | |
281 | |
282 @Override | |
283 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException { | |
284 int req_id = generateNextRequestID(); | |
285 | |
286 TypesWriter tw = new TypesWriter(); | |
287 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
288 tw.writeBytes(attr.toBytes()); | |
289 | |
290 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes()); | |
291 | |
292 expectStatusOKMessage(req_id); | |
293 } | |
294 | |
295 @Override | |
296 public void createSymlink(String src, String target) throws IOException { | |
297 int req_id = generateNextRequestID(); | |
298 | |
299 TypesWriter tw = new TypesWriter(); | |
300 tw.writeString(src, charset); | |
301 tw.writeString(target, charset); | |
302 | |
303 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes()); | |
304 | |
305 expectStatusOKMessage(req_id); | |
306 } | |
307 | |
308 @Override | |
309 public void createHardlink(String src, String target) throws IOException { | |
310 int req_id = generateNextRequestID(); | |
311 | |
312 TypesWriter tw = new TypesWriter(); | |
313 tw.writeString("hardlink@openssh.com", charset); | |
314 tw.writeString(target, charset); | |
315 tw.writeString(src, charset); | |
316 | |
317 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes()); | |
318 | |
319 expectStatusOKMessage(req_id); | |
320 } | |
321 | |
322 @Override | |
323 public String canonicalPath(String path) throws IOException { | |
324 int req_id = generateNextRequestID(); | |
325 | |
326 TypesWriter tw = new TypesWriter(); | |
327 tw.writeString(path, charset); | |
328 | |
329 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes()); | |
330 | |
331 byte[] resp = receiveMessage(34000); | |
332 | |
333 TypesReader tr = new TypesReader(resp); | |
334 | |
335 int t = tr.readByte(); | |
336 listener.read(Packet.forName(t)); | |
337 | |
338 int rep_id = tr.readUINT32(); | |
339 if(rep_id != req_id) { | |
340 throw new RequestMismatchException(); | |
341 } | |
342 | |
343 if(t == Packet.SSH_FXP_NAME) { | |
344 int count = tr.readUINT32(); | |
345 | |
346 if(count != 1) { | |
347 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet."); | |
348 } | |
349 final String name = tr.readString(charset); | |
350 listener.read(name); | |
351 return name; | |
352 } | |
353 | |
354 if(t != Packet.SSH_FXP_STATUS) { | |
355 throw new PacketTypeException(t); | |
356 } | |
357 | |
358 int errorCode = tr.readUINT32(); | |
359 String errorMessage = tr.readString(); | |
360 listener.read(errorMessage); | |
361 throw new SFTPException(errorMessage, errorCode); | |
362 } | |
363 | |
364 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException { | |
365 if(log.isDebugEnabled()) { | |
366 log.debug(String.format("Send message of type %d with request id %d", type, requestId)); | |
367 } | |
368 listener.write(Packet.forName(type)); | |
369 | |
370 int msglen = len + 1; | |
371 | |
372 if(type != Packet.SSH_FXP_INIT) { | |
373 msglen += 4; | |
374 } | |
375 | |
376 os.write(msglen >> 24); | |
377 os.write(msglen >> 16); | |
378 os.write(msglen >> 8); | |
379 os.write(msglen); | |
380 os.write(type); | |
381 | |
382 if(type != Packet.SSH_FXP_INIT) { | |
383 os.write(requestId >> 24); | |
384 os.write(requestId >> 16); | |
385 os.write(requestId >> 8); | |
386 os.write(requestId); | |
387 } | |
388 | |
389 os.write(msg, off, len); | |
390 os.flush(); | |
391 } | |
392 | |
393 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException { | |
394 sendMessage(type, requestId, msg, 0, msg.length); | |
395 } | |
396 | |
397 private void readBytes(byte[] buff, int pos, int len) throws IOException { | |
398 while(len > 0) { | |
399 int count = is.read(buff, pos, len); | |
400 if(count < 0) { | |
401 throw new SocketException("Unexpected end of stream."); | |
402 } | |
403 len -= count; | |
404 pos += count; | |
405 } | |
406 } | |
407 | |
408 /** | |
409 * Read a message and guarantee that the <b>contents</b> is not larger than | |
410 * <code>maxlen</code> bytes. | |
411 * <p/> | |
412 * Note: receiveMessage(34000) actually means that the message may be up to 34004 | |
413 * bytes (the length attribute preceding the contents is 4 bytes). | |
414 * | |
415 * @param maxlen | |
416 * @return the message contents | |
417 * @throws IOException | |
418 */ | |
419 protected byte[] receiveMessage(int maxlen) throws IOException { | |
420 byte[] msglen = new byte[4]; | |
421 | |
422 readBytes(msglen, 0, 4); | |
423 | |
424 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff)); | |
425 | |
426 if((len > maxlen) || (len <= 0)) { | |
427 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len)); | |
428 } | |
429 | |
430 byte[] msg = new byte[len]; | |
431 | |
432 readBytes(msg, 0, len); | |
433 | |
434 return msg; | |
435 } | |
436 | |
437 protected int generateNextRequestID() { | |
438 synchronized(this) { | |
439 return next_request_id++; | |
440 } | |
441 } | |
442 | |
443 protected void closeHandle(byte[] handle) throws IOException { | |
444 int req_id = generateNextRequestID(); | |
445 | |
446 TypesWriter tw = new TypesWriter(); | |
447 tw.writeString(handle, 0, handle.length); | |
448 | |
449 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes()); | |
450 | |
451 expectStatusOKMessage(req_id); | |
452 } | |
453 | |
454 private void readStatus() throws IOException { | |
455 byte[] resp = receiveMessage(34000); | |
456 | |
457 TypesReader tr = new TypesReader(resp); | |
458 int t = tr.readByte(); | |
459 listener.read(Packet.forName(t)); | |
460 | |
461 // Search the pending queue | |
462 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); | |
463 if(null == status) { | |
464 throw new RequestMismatchException(); | |
465 } | |
466 | |
467 // Evaluate the answer | |
468 if(t == Packet.SSH_FXP_STATUS) { | |
469 // In any case, stop sending more packets | |
470 int code = tr.readUINT32(); | |
471 if(log.isDebugEnabled()) { | |
472 String[] desc = ErrorCodes.getDescription(code); | |
473 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
474 } | |
475 if(code == ErrorCodes.SSH_FX_OK) { | |
476 return; | |
477 } | |
478 String msg = tr.readString(); | |
479 listener.read(msg); | |
480 throw new SFTPException(msg, code); | |
481 } | |
482 throw new PacketTypeException(t); | |
483 } | |
484 | |
485 private void readPendingReadStatus() throws IOException { | |
486 byte[] resp = receiveMessage(34000); | |
487 | |
488 TypesReader tr = new TypesReader(resp); | |
489 int t = tr.readByte(); | |
490 listener.read(Packet.forName(t)); | |
491 | |
492 // Search the pending queue | |
493 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32()); | |
494 if(null == status) { | |
495 throw new RequestMismatchException(); | |
496 } | |
497 | |
498 // Evaluate the answer | |
499 if(t == Packet.SSH_FXP_STATUS) { | |
500 // In any case, stop sending more packets | |
501 int code = tr.readUINT32(); | |
502 if(log.isDebugEnabled()) { | |
503 String[] desc = ErrorCodes.getDescription(code); | |
504 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
505 } | |
506 if(code == ErrorCodes.SSH_FX_OK) { | |
507 return; | |
508 } | |
509 if(code == ErrorCodes.SSH_FX_EOF) { | |
510 return; | |
511 } | |
512 String msg = tr.readString(); | |
513 listener.read(msg); | |
514 throw new SFTPException(msg, code); | |
515 } | |
516 throw new PacketTypeException(t); | |
517 } | |
518 | |
519 protected void expectStatusOKMessage(int id) throws IOException { | |
520 byte[] resp = receiveMessage(34000); | |
521 | |
522 TypesReader tr = new TypesReader(resp); | |
523 | |
524 int t = tr.readByte(); | |
525 listener.read(Packet.forName(t)); | |
526 | |
527 int rep_id = tr.readUINT32(); | |
528 if(rep_id != id) { | |
529 throw new RequestMismatchException(); | |
530 } | |
531 | |
532 if(t != Packet.SSH_FXP_STATUS) { | |
533 throw new PacketTypeException(t); | |
534 } | |
535 | |
536 int errorCode = tr.readUINT32(); | |
537 | |
538 if(errorCode == ErrorCodes.SSH_FX_OK) { | |
539 return; | |
540 } | |
541 String errorMessage = tr.readString(); | |
542 listener.read(errorMessage); | |
543 throw new SFTPException(errorMessage, errorCode); | |
544 } | |
545 | |
546 @Override | |
547 public void closeFile(SFTPFileHandle handle) throws IOException { | |
548 while(!pendingReadQueue.isEmpty()) { | |
549 this.readPendingReadStatus(); | |
550 } | |
551 while(!pendingStatusQueue.isEmpty()) { | |
552 this.readStatus(); | |
553 } | |
554 closeHandle(handle.getHandle()); | |
555 } | |
556 | |
557 @Override | |
558 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException { | |
559 boolean errorOccured = false; | |
560 int remaining = len * parallelism; | |
561 //int clientOffset = dstoff; | |
562 | |
563 long serverOffset = fileOffset; | |
564 for(OutstandingReadRequest r : pendingReadQueue.values()) { | |
565 // Server offset should take pending requests into account. | |
566 serverOffset += r.len; | |
567 } | |
568 | |
569 while(true) { | |
570 // Stop if there was an error and no outstanding request | |
571 if((pendingReadQueue.size() == 0) && errorOccured) { | |
572 break; | |
573 } | |
574 | |
575 // Send as many requests as we are allowed to | |
576 while(pendingReadQueue.size() < parallelism) { | |
577 if(errorOccured) { | |
578 break; | |
579 } | |
580 // Send the next read request | |
581 OutstandingReadRequest req = new OutstandingReadRequest(); | |
582 req.req_id = generateNextRequestID(); | |
583 req.serverOffset = serverOffset; | |
584 req.len = (remaining > len) ? len : remaining; | |
585 req.buffer = dst; | |
586 req.dstOffset = dstoff; | |
587 | |
588 serverOffset += req.len; | |
589 //clientOffset += req.len; | |
590 remaining -= req.len; | |
591 | |
592 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | |
593 | |
594 pendingReadQueue.put(req.req_id, req); | |
595 } | |
596 if(pendingReadQueue.size() == 0) { | |
597 break; | |
598 } | |
599 | |
600 // Receive a single answer | |
601 byte[] resp = receiveMessage(34000); | |
602 TypesReader tr = new TypesReader(resp); | |
603 | |
604 int t = tr.readByte(); | |
605 listener.read(Packet.forName(t)); | |
606 | |
607 // Search the pending queue | |
608 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); | |
609 if(null == req) { | |
610 throw new RequestMismatchException(); | |
611 } | |
612 // Evaluate the answer | |
613 if(t == Packet.SSH_FXP_STATUS) { | |
614 /* In any case, stop sending more packets */ | |
615 | |
616 int code = tr.readUINT32(); | |
617 String msg = tr.readString(); | |
618 listener.read(msg); | |
619 | |
620 if(log.isDebugEnabled()) { | |
621 String[] desc = ErrorCodes.getDescription(code); | |
622 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
623 } | |
624 // Flag to read all pending requests but don't send any more. | |
625 errorOccured = true; | |
626 if(pendingReadQueue.isEmpty()) { | |
627 if(ErrorCodes.SSH_FX_EOF == code) { | |
628 return -1; | |
629 } | |
630 throw new SFTPException(msg, code); | |
631 } | |
632 } | |
633 else if(t == Packet.SSH_FXP_DATA) { | |
634 // OK, collect data | |
635 int readLen = tr.readUINT32(); | |
636 | |
637 if((readLen < 0) || (readLen > req.len)) { | |
638 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet."); | |
639 } | |
640 | |
641 if(log.isDebugEnabled()) { | |
642 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen | |
643 + " (requested: " + req.len + ")"); | |
644 } | |
645 | |
646 // Read bytes into buffer | |
647 tr.readBytes(req.buffer, req.dstOffset, readLen); | |
648 | |
649 if(readLen < req.len) { | |
650 /* Send this request packet again to request the remaing data in this slot. */ | |
651 req.req_id = generateNextRequestID(); | |
652 req.serverOffset += readLen; | |
653 req.len -= readLen; | |
654 | |
655 log.debug("Requesting again: " + req.serverOffset + "/" + req.len); | |
656 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | |
657 | |
658 pendingReadQueue.put(req.req_id, req); | |
659 } | |
660 return readLen; | |
661 } | |
662 else { | |
663 throw new PacketTypeException(t); | |
664 } | |
665 } | |
666 // Should never reach here. | |
667 throw new SFTPException("No EOF reached", -1); | |
668 } | |
669 | |
670 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException { | |
671 TypesWriter tw = new TypesWriter(); | |
672 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
673 tw.writeUINT64(offset); | |
674 tw.writeUINT32(len); | |
675 | |
676 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes()); | |
677 } | |
678 | |
679 @Override | |
680 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { | |
681 while(len > 0) { | |
682 int writeRequestLen = len; | |
683 | |
684 if(writeRequestLen > 32768) { | |
685 writeRequestLen = 32768; | |
686 } | |
687 | |
688 // Send the next write request | |
689 OutstandingStatusRequest req = new OutstandingStatusRequest(); | |
690 req.req_id = generateNextRequestID(); | |
691 | |
692 TypesWriter tw = new TypesWriter(); | |
693 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
694 tw.writeUINT64(fileOffset); | |
695 tw.writeString(src, srcoff, writeRequestLen); | |
696 | |
697 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); | |
698 | |
699 pendingStatusQueue.put(req.req_id, req); | |
700 | |
701 // Only read next status if parallelism reached | |
702 while(pendingStatusQueue.size() >= parallelism) { | |
703 this.readStatus(); | |
704 } | |
705 fileOffset += writeRequestLen; | |
706 srcoff += writeRequestLen; | |
707 len -= writeRequestLen; | |
708 } | |
709 } | |
710 | |
711 | |
712 /** | |
713 * A read is divided into multiple requests sent sequentially before | |
714 * reading any status from the server | |
715 */ | |
716 private static class OutstandingReadRequest { | |
717 int req_id; | |
718 /** | |
719 * Read offset to request on server starting at the file offset for the first request. | |
720 */ | |
721 long serverOffset; | |
722 /** | |
723 * Length of requested data | |
724 */ | |
725 int len; | |
726 /** | |
727 * Offset in destination buffer | |
728 */ | |
729 int dstOffset; | |
730 /** | |
731 * Temporary buffer | |
732 */ | |
733 byte[] buffer; | |
734 } | |
735 | |
736 /** | |
737 * A read is divided into multiple requests sent sequentially before | |
738 * reading any status from the server | |
739 */ | |
740 private static final class OutstandingStatusRequest { | |
741 int req_id; | |
742 } | |
743 | |
744 } |