Mercurial > 510Connectbot
comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 342:175c7d68f3c4
merge ganymed into mainline
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Thu, 31 Jul 2014 16:33:38 -0700 |
parents | 071eccdff8ea |
children |
comparison
equal
deleted
inserted
replaced
272:ce2f4e397703 | 342:175c7d68f3c4 |
---|---|
1 package ch.ethz.ssh2; | |
2 | |
3 import java.io.BufferedOutputStream; | |
4 import java.io.IOException; | |
5 import java.io.InputStream; | |
6 import java.io.OutputStream; | |
7 import java.net.SocketException; | |
8 import java.nio.charset.Charset; | |
9 import java.nio.charset.UnsupportedCharsetException; | |
10 import java.util.HashMap; | |
11 import java.util.Map; | |
12 | |
13 import ch.ethz.ssh2.channel.Channel; | |
14 import ch.ethz.ssh2.log.Logger; | |
15 import ch.ethz.ssh2.packets.TypesReader; | |
16 import ch.ethz.ssh2.packets.TypesWriter; | |
17 import ch.ethz.ssh2.sftp.AttribFlags; | |
18 import ch.ethz.ssh2.sftp.ErrorCodes; | |
19 import ch.ethz.ssh2.sftp.Packet; | |
20 import ch.ethz.ssh2.util.StringEncoder; | |
21 | |
22 /** | |
23 * @version $Id: AbstractSFTPClient.java 151 2014-04-28 10:03:39Z dkocher@sudo.ch $ | |
24 */ | |
25 public abstract class AbstractSFTPClient implements SFTPClient { | |
26 | |
27 private static final Logger log = Logger.getLogger(SFTPv3Client.class); | |
28 | |
29 private Session sess; | |
30 | |
31 private InputStream is; | |
32 private OutputStream os; | |
33 | |
34 private int next_request_id = 1000; | |
35 | |
36 private String charset; | |
37 | |
38 /** | |
39 * Parallel read requests maximum size. | |
40 */ | |
41 private static final int DEFAULT_MAX_PARALLELISM = 64; | |
42 | |
43 /** | |
44 * Parallel read requests. | |
45 */ | |
46 private int parallelism = DEFAULT_MAX_PARALLELISM; | |
47 | |
48 public void setRequestParallelism(int parallelism) { | |
49 this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM); | |
50 } | |
51 | |
52 /** | |
53 * Mapping request ID to request. | |
54 */ | |
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue | |
56 = new HashMap<Integer, OutstandingReadRequest>(); | |
57 | |
58 /** | |
59 * Mapping request ID to request. | |
60 */ | |
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue | |
62 = new HashMap<Integer, OutstandingStatusRequest>(); | |
63 | |
64 private PacketListener listener; | |
65 | |
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException { | |
67 this.listener = listener; | |
68 log.debug("Opening session and starting SFTP subsystem."); | |
69 sess = conn.openSession(); | |
70 sess.startSubSystem("sftp"); | |
71 is = sess.getStdout(); | |
72 os = new BufferedOutputStream(sess.getStdin(), 2048); | |
73 init(version); | |
74 } | |
75 | |
76 private void init(final int client_version) throws IOException { | |
77 // Send SSH_FXP_INIT with client version | |
78 TypesWriter tw = new TypesWriter(); | |
79 tw.writeUINT32(client_version); | |
80 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes()); | |
81 /* Receive SSH_FXP_VERSION */ | |
82 log.debug("Waiting for SSH_FXP_VERSION..."); | |
83 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */ | |
84 int t = tr.readByte(); | |
85 listener.read(Packet.forName(t)); | |
86 | |
87 if (t != Packet.SSH_FXP_VERSION) { | |
88 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t)); | |
89 throw new PacketTypeException(t); | |
90 } | |
91 | |
92 final int protocol_version = tr.readUINT32(); | |
93 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version); | |
94 | |
95 if (protocol_version != client_version) { | |
96 throw new IOException(String.format("Server protocol version %d does not match %d", | |
97 protocol_version, client_version)); | |
98 } | |
99 | |
100 // Both parties should from then on adhere to particular version of the protocol | |
101 | |
102 // Read and save extensions (if any) for later use | |
103 while (tr.remain() != 0) { | |
104 String name = tr.readString(); | |
105 listener.read(name); | |
106 byte[] value = tr.readByteString(); | |
107 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value))); | |
108 } | |
109 } | |
110 | |
111 /** | |
112 * Queries the channel state | |
113 * | |
114 * @return True if the underlying session is in open state | |
115 */ | |
116 public boolean isConnected() { | |
117 return sess.getState() == Channel.STATE_OPEN; | |
118 } | |
119 | |
120 /** | |
121 * Close this SFTP session. NEVER forget to call this method to free up | |
122 * resources - even if you got an exception from one of the other methods. | |
123 * Sometimes these other methods may throw an exception, saying that the | |
124 * underlying channel is closed (this can happen, e.g., if the other server | |
125 * sent a close message.) However, as long as you have not called the | |
126 * <code>close()</code> method, you are likely wasting resources. | |
127 */ | |
128 public void close() { | |
129 sess.close(); | |
130 } | |
131 | |
132 /** | |
133 * Set the charset used to convert between Java Unicode Strings and byte encodings | |
134 * used by the server for paths and file names. | |
135 * | |
136 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8. | |
137 * @throws java.io.IOException | |
138 * @see #getCharset() | |
139 */ | |
140 public void setCharset(String charset) throws IOException { | |
141 if (charset == null) { | |
142 this.charset = null; | |
143 return; | |
144 } | |
145 | |
146 try { | |
147 Charset.forName(charset); | |
148 } | |
149 catch (UnsupportedCharsetException e) { | |
150 throw new IOException("This charset is not supported", e); | |
151 } | |
152 | |
153 this.charset = charset; | |
154 } | |
155 | |
156 /** | |
157 * The currently used charset for filename encoding/decoding. | |
158 * | |
159 * @return The name of the charset (<code>null</code> if UTF-8 is used). | |
160 * @see #setCharset(String) | |
161 */ | |
162 public String getCharset() { | |
163 return charset; | |
164 } | |
165 | |
166 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException; | |
167 | |
168 public void mkdir(String dirName, int posixPermissions) throws IOException { | |
169 int req_id = generateNextRequestID(); | |
170 TypesWriter tw = new TypesWriter(); | |
171 tw.writeString(dirName, this.getCharset()); | |
172 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS); | |
173 tw.writeUINT32(posixPermissions); | |
174 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes()); | |
175 expectStatusOKMessage(req_id); | |
176 } | |
177 | |
178 public void rm(String fileName) throws IOException { | |
179 int req_id = generateNextRequestID(); | |
180 TypesWriter tw = new TypesWriter(); | |
181 tw.writeString(fileName, this.getCharset()); | |
182 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes()); | |
183 expectStatusOKMessage(req_id); | |
184 } | |
185 | |
186 public void rmdir(String dirName) throws IOException { | |
187 int req_id = generateNextRequestID(); | |
188 TypesWriter tw = new TypesWriter(); | |
189 tw.writeString(dirName, this.getCharset()); | |
190 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes()); | |
191 expectStatusOKMessage(req_id); | |
192 } | |
193 | |
194 public void mv(String oldPath, String newPath) throws IOException { | |
195 int req_id = generateNextRequestID(); | |
196 TypesWriter tw = new TypesWriter(); | |
197 tw.writeString(oldPath, this.getCharset()); | |
198 tw.writeString(newPath, this.getCharset()); | |
199 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes()); | |
200 expectStatusOKMessage(req_id); | |
201 } | |
202 | |
203 public String readLink(String path) throws IOException { | |
204 int req_id = generateNextRequestID(); | |
205 TypesWriter tw = new TypesWriter(); | |
206 tw.writeString(path, charset); | |
207 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes()); | |
208 byte[] resp = receiveMessage(34000); | |
209 TypesReader tr = new TypesReader(resp); | |
210 int t = tr.readByte(); | |
211 listener.read(Packet.forName(t)); | |
212 int rep_id = tr.readUINT32(); | |
213 | |
214 if (rep_id != req_id) { | |
215 throw new RequestMismatchException(); | |
216 } | |
217 | |
218 if (t == Packet.SSH_FXP_NAME) { | |
219 int count = tr.readUINT32(); | |
220 | |
221 if (count != 1) { | |
222 throw new PacketTypeException(t); | |
223 } | |
224 | |
225 return tr.readString(charset); | |
226 } | |
227 | |
228 if (t != Packet.SSH_FXP_STATUS) { | |
229 throw new PacketTypeException(t); | |
230 } | |
231 | |
232 int errorCode = tr.readUINT32(); | |
233 String errorMessage = tr.readString(); | |
234 listener.read(errorMessage); | |
235 throw new SFTPException(errorMessage, errorCode); | |
236 } | |
237 | |
238 public void setstat(String path, SFTPFileAttributes attr) throws IOException { | |
239 int req_id = generateNextRequestID(); | |
240 TypesWriter tw = new TypesWriter(); | |
241 tw.writeString(path, charset); | |
242 tw.writeBytes(attr.toBytes()); | |
243 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes()); | |
244 expectStatusOKMessage(req_id); | |
245 } | |
246 | |
247 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException { | |
248 int req_id = generateNextRequestID(); | |
249 TypesWriter tw = new TypesWriter(); | |
250 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
251 tw.writeBytes(attr.toBytes()); | |
252 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes()); | |
253 expectStatusOKMessage(req_id); | |
254 } | |
255 | |
256 public void createSymlink(String src, String target) throws IOException { | |
257 int req_id = generateNextRequestID(); | |
258 TypesWriter tw = new TypesWriter(); | |
259 tw.writeString(src, charset); | |
260 tw.writeString(target, charset); | |
261 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes()); | |
262 expectStatusOKMessage(req_id); | |
263 } | |
264 | |
265 public void createHardlink(String src, String target) throws IOException { | |
266 int req_id = generateNextRequestID(); | |
267 TypesWriter tw = new TypesWriter(); | |
268 tw.writeString("hardlink@openssh.com", charset); | |
269 tw.writeString(target, charset); | |
270 tw.writeString(src, charset); | |
271 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes()); | |
272 expectStatusOKMessage(req_id); | |
273 } | |
274 | |
275 public String canonicalPath(String path) throws IOException { | |
276 int req_id = generateNextRequestID(); | |
277 TypesWriter tw = new TypesWriter(); | |
278 tw.writeString(path, charset); | |
279 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes()); | |
280 byte[] resp = receiveMessage(34000); | |
281 TypesReader tr = new TypesReader(resp); | |
282 int t = tr.readByte(); | |
283 listener.read(Packet.forName(t)); | |
284 int rep_id = tr.readUINT32(); | |
285 | |
286 if (rep_id != req_id) { | |
287 throw new RequestMismatchException(); | |
288 } | |
289 | |
290 if (t == Packet.SSH_FXP_NAME) { | |
291 int count = tr.readUINT32(); | |
292 | |
293 if (count != 1) { | |
294 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet."); | |
295 } | |
296 | |
297 final String name = tr.readString(charset); | |
298 listener.read(name); | |
299 return name; | |
300 } | |
301 | |
302 if (t != Packet.SSH_FXP_STATUS) { | |
303 throw new PacketTypeException(t); | |
304 } | |
305 | |
306 int errorCode = tr.readUINT32(); | |
307 String errorMessage = tr.readString(); | |
308 listener.read(errorMessage); | |
309 throw new SFTPException(errorMessage, errorCode); | |
310 } | |
311 | |
312 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException { | |
313 if (log.isDebugEnabled()) { | |
314 log.debug(String.format("Send message of type %d with request id %d", type, requestId)); | |
315 } | |
316 | |
317 listener.write(Packet.forName(type)); | |
318 int msglen = len + 1; | |
319 | |
320 if (type != Packet.SSH_FXP_INIT) { | |
321 msglen += 4; | |
322 } | |
323 | |
324 os.write(msglen >> 24); | |
325 os.write(msglen >> 16); | |
326 os.write(msglen >> 8); | |
327 os.write(msglen); | |
328 os.write(type); | |
329 | |
330 if (type != Packet.SSH_FXP_INIT) { | |
331 os.write(requestId >> 24); | |
332 os.write(requestId >> 16); | |
333 os.write(requestId >> 8); | |
334 os.write(requestId); | |
335 } | |
336 | |
337 os.write(msg, off, len); | |
338 os.flush(); | |
339 } | |
340 | |
341 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException { | |
342 sendMessage(type, requestId, msg, 0, msg.length); | |
343 } | |
344 | |
345 private void readBytes(byte[] buff, int pos, int len) throws IOException { | |
346 while (len > 0) { | |
347 int count = is.read(buff, pos, len); | |
348 | |
349 if (count < 0) { | |
350 throw new SocketException("Unexpected end of stream."); | |
351 } | |
352 | |
353 len -= count; | |
354 pos += count; | |
355 } | |
356 } | |
357 | |
358 /** | |
359 * Read a message and guarantee that the <b>contents</b> is not larger than | |
360 * <code>maxlen</code> bytes. | |
361 * <p/> | |
362 * Note: receiveMessage(34000) actually means that the message may be up to 34004 | |
363 * bytes (the length attribute preceding the contents is 4 bytes). | |
364 * | |
365 * @param maxlen | |
366 * @return the message contents | |
367 * @throws IOException | |
368 */ | |
369 protected byte[] receiveMessage(int maxlen) throws IOException { | |
370 byte[] msglen = new byte[4]; | |
371 readBytes(msglen, 0, 4); | |
372 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff)); | |
373 | |
374 if ((len > maxlen) || (len <= 0)) { | |
375 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len)); | |
376 } | |
377 | |
378 byte[] msg = new byte[len]; | |
379 readBytes(msg, 0, len); | |
380 return msg; | |
381 } | |
382 | |
383 protected int generateNextRequestID() { | |
384 synchronized (this) { | |
385 return next_request_id++; | |
386 } | |
387 } | |
388 | |
389 protected void closeHandle(byte[] handle) throws IOException { | |
390 int req_id = generateNextRequestID(); | |
391 TypesWriter tw = new TypesWriter(); | |
392 tw.writeString(handle, 0, handle.length); | |
393 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes()); | |
394 expectStatusOKMessage(req_id); | |
395 } | |
396 | |
397 private void readStatus() throws IOException { | |
398 byte[] resp = receiveMessage(34000); | |
399 TypesReader tr = new TypesReader(resp); | |
400 int t = tr.readByte(); | |
401 listener.read(Packet.forName(t)); | |
402 // Search the pending queue | |
403 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); | |
404 | |
405 if (null == status) { | |
406 throw new RequestMismatchException(); | |
407 } | |
408 | |
409 // Evaluate the answer | |
410 if (t == Packet.SSH_FXP_STATUS) { | |
411 // In any case, stop sending more packets | |
412 int code = tr.readUINT32(); | |
413 | |
414 if (log.isDebugEnabled()) { | |
415 String[] desc = ErrorCodes.getDescription(code); | |
416 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
417 } | |
418 | |
419 if (code == ErrorCodes.SSH_FX_OK) { | |
420 return; | |
421 } | |
422 | |
423 String msg = tr.readString(); | |
424 listener.read(msg); | |
425 throw new SFTPException(msg, code); | |
426 } | |
427 | |
428 throw new PacketTypeException(t); | |
429 } | |
430 | |
431 private void readPendingReadStatus() throws IOException { | |
432 byte[] resp = receiveMessage(34000); | |
433 TypesReader tr = new TypesReader(resp); | |
434 int t = tr.readByte(); | |
435 listener.read(Packet.forName(t)); | |
436 // Search the pending queue | |
437 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32()); | |
438 | |
439 if (null == status) { | |
440 throw new RequestMismatchException(); | |
441 } | |
442 | |
443 // Evaluate the answer | |
444 if (t == Packet.SSH_FXP_STATUS) { | |
445 // In any case, stop sending more packets | |
446 int code = tr.readUINT32(); | |
447 | |
448 if (log.isDebugEnabled()) { | |
449 String[] desc = ErrorCodes.getDescription(code); | |
450 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
451 } | |
452 | |
453 if (code == ErrorCodes.SSH_FX_OK) { | |
454 return; | |
455 } | |
456 | |
457 if (code == ErrorCodes.SSH_FX_EOF) { | |
458 return; | |
459 } | |
460 | |
461 String msg = tr.readString(); | |
462 listener.read(msg); | |
463 throw new SFTPException(msg, code); | |
464 } | |
465 | |
466 throw new PacketTypeException(t); | |
467 } | |
468 | |
469 protected void expectStatusOKMessage(int id) throws IOException { | |
470 byte[] resp = receiveMessage(34000); | |
471 TypesReader tr = new TypesReader(resp); | |
472 int t = tr.readByte(); | |
473 listener.read(Packet.forName(t)); | |
474 int rep_id = tr.readUINT32(); | |
475 | |
476 if (rep_id != id) { | |
477 throw new RequestMismatchException(); | |
478 } | |
479 | |
480 if (t != Packet.SSH_FXP_STATUS) { | |
481 throw new PacketTypeException(t); | |
482 } | |
483 | |
484 int errorCode = tr.readUINT32(); | |
485 | |
486 if (errorCode == ErrorCodes.SSH_FX_OK) { | |
487 return; | |
488 } | |
489 | |
490 String errorMessage = tr.readString(); | |
491 listener.read(errorMessage); | |
492 throw new SFTPException(errorMessage, errorCode); | |
493 } | |
494 | |
495 public void closeFile(SFTPFileHandle handle) throws IOException { | |
496 while (!pendingReadQueue.isEmpty()) { | |
497 this.readPendingReadStatus(); | |
498 } | |
499 | |
500 while (!pendingStatusQueue.isEmpty()) { | |
501 this.readStatus(); | |
502 } | |
503 | |
504 closeHandle(handle.getHandle()); | |
505 } | |
506 | |
507 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException { | |
508 boolean errorOccured = false; | |
509 int remaining = len * parallelism; | |
510 //int clientOffset = dstoff; | |
511 long serverOffset = fileOffset; | |
512 | |
513 for (OutstandingReadRequest r : pendingReadQueue.values()) { | |
514 // Server offset should take pending requests into account. | |
515 serverOffset += r.len; | |
516 } | |
517 | |
518 while (true) { | |
519 // Stop if there was an error and no outstanding request | |
520 if ((pendingReadQueue.size() == 0) && errorOccured) { | |
521 break; | |
522 } | |
523 | |
524 // Send as many requests as we are allowed to | |
525 while (pendingReadQueue.size() < parallelism) { | |
526 if (errorOccured) { | |
527 break; | |
528 } | |
529 | |
530 // Send the next read request | |
531 OutstandingReadRequest req = new OutstandingReadRequest(); | |
532 req.req_id = generateNextRequestID(); | |
533 req.serverOffset = serverOffset; | |
534 req.len = (remaining > len) ? len : remaining; | |
535 req.buffer = dst; | |
536 req.dstOffset = dstoff; | |
537 serverOffset += req.len; | |
538 //clientOffset += req.len; | |
539 remaining -= req.len; | |
540 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | |
541 pendingReadQueue.put(req.req_id, req); | |
542 } | |
543 | |
544 if (pendingReadQueue.size() == 0) { | |
545 break; | |
546 } | |
547 | |
548 // Receive a single answer | |
549 byte[] resp = receiveMessage(34000); | |
550 TypesReader tr = new TypesReader(resp); | |
551 int t = tr.readByte(); | |
552 listener.read(Packet.forName(t)); | |
553 // Search the pending queue | |
554 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); | |
555 | |
556 if (null == req) { | |
557 throw new RequestMismatchException(); | |
558 } | |
559 | |
560 // Evaluate the answer | |
561 if (t == Packet.SSH_FXP_STATUS) { | |
562 /* In any case, stop sending more packets */ | |
563 int code = tr.readUINT32(); | |
564 String msg = tr.readString(); | |
565 listener.read(msg); | |
566 | |
567 if (log.isDebugEnabled()) { | |
568 String[] desc = ErrorCodes.getDescription(code); | |
569 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | |
570 } | |
571 | |
572 // Flag to read all pending requests but don't send any more. | |
573 errorOccured = true; | |
574 | |
575 if (pendingReadQueue.isEmpty()) { | |
576 if (ErrorCodes.SSH_FX_EOF == code) { | |
577 return -1; | |
578 } | |
579 | |
580 throw new SFTPException(msg, code); | |
581 } | |
582 } | |
583 else if (t == Packet.SSH_FXP_DATA) { | |
584 // OK, collect data | |
585 int readLen = tr.readUINT32(); | |
586 | |
587 if ((readLen < 0) || (readLen > req.len)) { | |
588 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet."); | |
589 } | |
590 | |
591 if (log.isDebugEnabled()) { | |
592 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen | |
593 + " (requested: " + req.len + ")"); | |
594 } | |
595 | |
596 // Read bytes into buffer | |
597 tr.readBytes(req.buffer, req.dstOffset, readLen); | |
598 | |
599 if (readLen < req.len) { | |
600 /* Send this request packet again to request the remaing data in this slot. */ | |
601 req.req_id = generateNextRequestID(); | |
602 req.serverOffset += readLen; | |
603 req.len -= readLen; | |
604 log.debug("Requesting again: " + req.serverOffset + "/" + req.len); | |
605 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | |
606 pendingReadQueue.put(req.req_id, req); | |
607 } | |
608 | |
609 return readLen; | |
610 } | |
611 else { | |
612 throw new PacketTypeException(t); | |
613 } | |
614 } | |
615 | |
616 // Should never reach here. | |
617 throw new SFTPException("No EOF reached", -1); | |
618 } | |
619 | |
620 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException { | |
621 TypesWriter tw = new TypesWriter(); | |
622 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
623 tw.writeUINT64(offset); | |
624 tw.writeUINT32(len); | |
625 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes()); | |
626 } | |
627 | |
628 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { | |
629 while (len > 0) { | |
630 int writeRequestLen = len; | |
631 | |
632 if (writeRequestLen > 32768) { | |
633 writeRequestLen = 32768; | |
634 } | |
635 | |
636 // Send the next write request | |
637 OutstandingStatusRequest req = new OutstandingStatusRequest(); | |
638 req.req_id = generateNextRequestID(); | |
639 TypesWriter tw = new TypesWriter(); | |
640 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | |
641 tw.writeUINT64(fileOffset); | |
642 tw.writeString(src, srcoff, writeRequestLen); | |
643 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); | |
644 pendingStatusQueue.put(req.req_id, req); | |
645 | |
646 // Only read next status if parallelism reached | |
647 while (pendingStatusQueue.size() >= parallelism) { | |
648 this.readStatus(); | |
649 } | |
650 | |
651 fileOffset += writeRequestLen; | |
652 srcoff += writeRequestLen; | |
653 len -= writeRequestLen; | |
654 } | |
655 } | |
656 | |
657 | |
658 /** | |
659 * A read is divided into multiple requests sent sequentially before | |
660 * reading any status from the server | |
661 */ | |
662 private static class OutstandingReadRequest { | |
663 int req_id; | |
664 /** | |
665 * Read offset to request on server starting at the file offset for the first request. | |
666 */ | |
667 long serverOffset; | |
668 /** | |
669 * Length of requested data | |
670 */ | |
671 int len; | |
672 /** | |
673 * Offset in destination buffer | |
674 */ | |
675 int dstOffset; | |
676 /** | |
677 * Temporary buffer | |
678 */ | |
679 byte[] buffer; | |
680 } | |
681 | |
682 /** | |
683 * A read is divided into multiple requests sent sequentially before | |
684 * reading any status from the server | |
685 */ | |
686 private static final class OutstandingStatusRequest { | |
687 int req_id; | |
688 } | |
689 | |
690 } |