comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 308:42b15aaa7ac7 ganymed

merge
author Carl Byington <carl@five-ten-sg.com>
date Wed, 30 Jul 2014 14:21:50 -0700
parents 071eccdff8ea
children
comparison
equal deleted inserted replaced
306:90e47d99ea54 308:42b15aaa7ac7
51 51
52 /** 52 /**
53 * Mapping request ID to request. 53 * Mapping request ID to request.
54 */ 54 */
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue 55 private Map<Integer, OutstandingReadRequest> pendingReadQueue
56 = new HashMap<Integer, OutstandingReadRequest>(); 56 = new HashMap<Integer, OutstandingReadRequest>();
57 57
58 /** 58 /**
59 * Mapping request ID to request. 59 * Mapping request ID to request.
60 */ 60 */
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue 61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue
62 = new HashMap<Integer, OutstandingStatusRequest>(); 62 = new HashMap<Integer, OutstandingStatusRequest>();
63 63
64 private PacketListener listener; 64 private PacketListener listener;
65 65
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException { 66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException {
67 this.listener = listener; 67 this.listener = listener;
68
69 log.debug("Opening session and starting SFTP subsystem."); 68 log.debug("Opening session and starting SFTP subsystem.");
70 sess = conn.openSession(); 69 sess = conn.openSession();
71 sess.startSubSystem("sftp"); 70 sess.startSubSystem("sftp");
72
73 is = sess.getStdout(); 71 is = sess.getStdout();
74 os = new BufferedOutputStream(sess.getStdin(), 2048); 72 os = new BufferedOutputStream(sess.getStdin(), 2048);
75
76 init(version); 73 init(version);
77
78 } 74 }
79 75
80 private void init(final int client_version) throws IOException { 76 private void init(final int client_version) throws IOException {
81 // Send SSH_FXP_INIT with client version 77 // Send SSH_FXP_INIT with client version
82
83 TypesWriter tw = new TypesWriter(); 78 TypesWriter tw = new TypesWriter();
84 tw.writeUINT32(client_version); 79 tw.writeUINT32(client_version);
85 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes()); 80 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes());
86 81 /* Receive SSH_FXP_VERSION */
87 /* Receive SSH_FXP_VERSION */
88
89 log.debug("Waiting for SSH_FXP_VERSION..."); 82 log.debug("Waiting for SSH_FXP_VERSION...");
90 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */ 83 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */
91
92 int t = tr.readByte(); 84 int t = tr.readByte();
93 listener.read(Packet.forName(t)); 85 listener.read(Packet.forName(t));
94 86
95 if(t != Packet.SSH_FXP_VERSION) { 87 if (t != Packet.SSH_FXP_VERSION) {
96 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t)); 88 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t));
97 throw new PacketTypeException(t); 89 throw new PacketTypeException(t);
98 } 90 }
99 91
100 final int protocol_version = tr.readUINT32(); 92 final int protocol_version = tr.readUINT32();
101
102 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version); 93 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
103 if(protocol_version != client_version) { 94
95 if (protocol_version != client_version) {
104 throw new IOException(String.format("Server protocol version %d does not match %d", 96 throw new IOException(String.format("Server protocol version %d does not match %d",
105 protocol_version, client_version)); 97 protocol_version, client_version));
106 } 98 }
99
107 // Both parties should from then on adhere to particular version of the protocol 100 // Both parties should from then on adhere to particular version of the protocol
108 101
109 // Read and save extensions (if any) for later use 102 // Read and save extensions (if any) for later use
110 while(tr.remain() != 0) { 103 while (tr.remain() != 0) {
111 String name = tr.readString(); 104 String name = tr.readString();
112 listener.read(name); 105 listener.read(name);
113 byte[] value = tr.readByteString(); 106 byte[] value = tr.readByteString();
114 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value))); 107 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value)));
115 } 108 }
143 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8. 136 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8.
144 * @throws java.io.IOException 137 * @throws java.io.IOException
145 * @see #getCharset() 138 * @see #getCharset()
146 */ 139 */
147 public void setCharset(String charset) throws IOException { 140 public void setCharset(String charset) throws IOException {
148 if(charset == null) { 141 if (charset == null) {
149 this.charset = null; 142 this.charset = null;
150 return; 143 return;
151 } 144 }
145
152 try { 146 try {
153 Charset.forName(charset); 147 Charset.forName(charset);
154 } 148 }
155 catch(UnsupportedCharsetException e) { 149 catch (UnsupportedCharsetException e) {
156 throw new IOException("This charset is not supported", e); 150 throw new IOException("This charset is not supported", e);
157 } 151 }
152
158 this.charset = charset; 153 this.charset = charset;
159 } 154 }
160 155
161 /** 156 /**
162 * The currently used charset for filename encoding/decoding. 157 * The currently used charset for filename encoding/decoding.
170 165
171 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException; 166 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException;
172 167
173 public void mkdir(String dirName, int posixPermissions) throws IOException { 168 public void mkdir(String dirName, int posixPermissions) throws IOException {
174 int req_id = generateNextRequestID(); 169 int req_id = generateNextRequestID();
175
176 TypesWriter tw = new TypesWriter(); 170 TypesWriter tw = new TypesWriter();
177 tw.writeString(dirName, this.getCharset()); 171 tw.writeString(dirName, this.getCharset());
178 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS); 172 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS);
179 tw.writeUINT32(posixPermissions); 173 tw.writeUINT32(posixPermissions);
180
181 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes()); 174 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes());
182
183 expectStatusOKMessage(req_id); 175 expectStatusOKMessage(req_id);
184 } 176 }
185 177
186 public void rm(String fileName) throws IOException { 178 public void rm(String fileName) throws IOException {
187 int req_id = generateNextRequestID(); 179 int req_id = generateNextRequestID();
188
189 TypesWriter tw = new TypesWriter(); 180 TypesWriter tw = new TypesWriter();
190 tw.writeString(fileName, this.getCharset()); 181 tw.writeString(fileName, this.getCharset());
191
192 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes()); 182 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes());
193
194 expectStatusOKMessage(req_id); 183 expectStatusOKMessage(req_id);
195 } 184 }
196 185
197 public void rmdir(String dirName) throws IOException { 186 public void rmdir(String dirName) throws IOException {
198 int req_id = generateNextRequestID(); 187 int req_id = generateNextRequestID();
199
200 TypesWriter tw = new TypesWriter(); 188 TypesWriter tw = new TypesWriter();
201 tw.writeString(dirName, this.getCharset()); 189 tw.writeString(dirName, this.getCharset());
202
203 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes()); 190 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes());
204
205 expectStatusOKMessage(req_id); 191 expectStatusOKMessage(req_id);
206 } 192 }
207 193
208 public void mv(String oldPath, String newPath) throws IOException { 194 public void mv(String oldPath, String newPath) throws IOException {
209 int req_id = generateNextRequestID(); 195 int req_id = generateNextRequestID();
210
211 TypesWriter tw = new TypesWriter(); 196 TypesWriter tw = new TypesWriter();
212 tw.writeString(oldPath, this.getCharset()); 197 tw.writeString(oldPath, this.getCharset());
213 tw.writeString(newPath, this.getCharset()); 198 tw.writeString(newPath, this.getCharset());
214
215 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes()); 199 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes());
216
217 expectStatusOKMessage(req_id); 200 expectStatusOKMessage(req_id);
218 } 201 }
219 202
220 public String readLink(String path) throws IOException { 203 public String readLink(String path) throws IOException {
221 int req_id = generateNextRequestID(); 204 int req_id = generateNextRequestID();
222
223 TypesWriter tw = new TypesWriter(); 205 TypesWriter tw = new TypesWriter();
224 tw.writeString(path, charset); 206 tw.writeString(path, charset);
225
226 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes()); 207 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes());
227
228 byte[] resp = receiveMessage(34000); 208 byte[] resp = receiveMessage(34000);
229
230 TypesReader tr = new TypesReader(resp); 209 TypesReader tr = new TypesReader(resp);
231
232 int t = tr.readByte(); 210 int t = tr.readByte();
233 listener.read(Packet.forName(t)); 211 listener.read(Packet.forName(t));
234
235 int rep_id = tr.readUINT32(); 212 int rep_id = tr.readUINT32();
236 if(rep_id != req_id) { 213
214 if (rep_id != req_id) {
237 throw new RequestMismatchException(); 215 throw new RequestMismatchException();
238 } 216 }
239 217
240 if(t == Packet.SSH_FXP_NAME) { 218 if (t == Packet.SSH_FXP_NAME) {
241 int count = tr.readUINT32(); 219 int count = tr.readUINT32();
242 220
243 if(count != 1) { 221 if (count != 1) {
244 throw new PacketTypeException(t); 222 throw new PacketTypeException(t);
245 } 223 }
246 224
247 return tr.readString(charset); 225 return tr.readString(charset);
248 } 226 }
249 227
250 if(t != Packet.SSH_FXP_STATUS) { 228 if (t != Packet.SSH_FXP_STATUS) {
251 throw new PacketTypeException(t); 229 throw new PacketTypeException(t);
252 } 230 }
253 231
254 int errorCode = tr.readUINT32(); 232 int errorCode = tr.readUINT32();
255 String errorMessage = tr.readString(); 233 String errorMessage = tr.readString();
257 throw new SFTPException(errorMessage, errorCode); 235 throw new SFTPException(errorMessage, errorCode);
258 } 236 }
259 237
260 public void setstat(String path, SFTPFileAttributes attr) throws IOException { 238 public void setstat(String path, SFTPFileAttributes attr) throws IOException {
261 int req_id = generateNextRequestID(); 239 int req_id = generateNextRequestID();
262
263 TypesWriter tw = new TypesWriter(); 240 TypesWriter tw = new TypesWriter();
264 tw.writeString(path, charset); 241 tw.writeString(path, charset);
265 tw.writeBytes(attr.toBytes()); 242 tw.writeBytes(attr.toBytes());
266
267 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes()); 243 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes());
268
269 expectStatusOKMessage(req_id); 244 expectStatusOKMessage(req_id);
270 } 245 }
271 246
272 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException { 247 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException {
273 int req_id = generateNextRequestID(); 248 int req_id = generateNextRequestID();
274
275 TypesWriter tw = new TypesWriter(); 249 TypesWriter tw = new TypesWriter();
276 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); 250 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
277 tw.writeBytes(attr.toBytes()); 251 tw.writeBytes(attr.toBytes());
278
279 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes()); 252 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes());
280
281 expectStatusOKMessage(req_id); 253 expectStatusOKMessage(req_id);
282 } 254 }
283 255
284 public void createSymlink(String src, String target) throws IOException { 256 public void createSymlink(String src, String target) throws IOException {
285 int req_id = generateNextRequestID(); 257 int req_id = generateNextRequestID();
286
287 TypesWriter tw = new TypesWriter(); 258 TypesWriter tw = new TypesWriter();
288 tw.writeString(src, charset); 259 tw.writeString(src, charset);
289 tw.writeString(target, charset); 260 tw.writeString(target, charset);
290
291 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes()); 261 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes());
292
293 expectStatusOKMessage(req_id); 262 expectStatusOKMessage(req_id);
294 } 263 }
295 264
296 public void createHardlink(String src, String target) throws IOException { 265 public void createHardlink(String src, String target) throws IOException {
297 int req_id = generateNextRequestID(); 266 int req_id = generateNextRequestID();
298
299 TypesWriter tw = new TypesWriter(); 267 TypesWriter tw = new TypesWriter();
300 tw.writeString("hardlink@openssh.com", charset); 268 tw.writeString("hardlink@openssh.com", charset);
301 tw.writeString(target, charset); 269 tw.writeString(target, charset);
302 tw.writeString(src, charset); 270 tw.writeString(src, charset);
303
304 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes()); 271 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes());
305
306 expectStatusOKMessage(req_id); 272 expectStatusOKMessage(req_id);
307 } 273 }
308 274
309 public String canonicalPath(String path) throws IOException { 275 public String canonicalPath(String path) throws IOException {
310 int req_id = generateNextRequestID(); 276 int req_id = generateNextRequestID();
311
312 TypesWriter tw = new TypesWriter(); 277 TypesWriter tw = new TypesWriter();
313 tw.writeString(path, charset); 278 tw.writeString(path, charset);
314
315 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes()); 279 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes());
316
317 byte[] resp = receiveMessage(34000); 280 byte[] resp = receiveMessage(34000);
318
319 TypesReader tr = new TypesReader(resp); 281 TypesReader tr = new TypesReader(resp);
320
321 int t = tr.readByte(); 282 int t = tr.readByte();
322 listener.read(Packet.forName(t)); 283 listener.read(Packet.forName(t));
323
324 int rep_id = tr.readUINT32(); 284 int rep_id = tr.readUINT32();
325 if(rep_id != req_id) { 285
286 if (rep_id != req_id) {
326 throw new RequestMismatchException(); 287 throw new RequestMismatchException();
327 } 288 }
328 289
329 if(t == Packet.SSH_FXP_NAME) { 290 if (t == Packet.SSH_FXP_NAME) {
330 int count = tr.readUINT32(); 291 int count = tr.readUINT32();
331 292
332 if(count != 1) { 293 if (count != 1) {
333 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet."); 294 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet.");
334 } 295 }
296
335 final String name = tr.readString(charset); 297 final String name = tr.readString(charset);
336 listener.read(name); 298 listener.read(name);
337 return name; 299 return name;
338 } 300 }
339 301
340 if(t != Packet.SSH_FXP_STATUS) { 302 if (t != Packet.SSH_FXP_STATUS) {
341 throw new PacketTypeException(t); 303 throw new PacketTypeException(t);
342 } 304 }
343 305
344 int errorCode = tr.readUINT32(); 306 int errorCode = tr.readUINT32();
345 String errorMessage = tr.readString(); 307 String errorMessage = tr.readString();
346 listener.read(errorMessage); 308 listener.read(errorMessage);
347 throw new SFTPException(errorMessage, errorCode); 309 throw new SFTPException(errorMessage, errorCode);
348 } 310 }
349 311
350 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException { 312 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException {
351 if(log.isDebugEnabled()) { 313 if (log.isDebugEnabled()) {
352 log.debug(String.format("Send message of type %d with request id %d", type, requestId)); 314 log.debug(String.format("Send message of type %d with request id %d", type, requestId));
353 } 315 }
316
354 listener.write(Packet.forName(type)); 317 listener.write(Packet.forName(type));
355
356 int msglen = len + 1; 318 int msglen = len + 1;
357 319
358 if(type != Packet.SSH_FXP_INIT) { 320 if (type != Packet.SSH_FXP_INIT) {
359 msglen += 4; 321 msglen += 4;
360 } 322 }
361 323
362 os.write(msglen >> 24); 324 os.write(msglen >> 24);
363 os.write(msglen >> 16); 325 os.write(msglen >> 16);
364 os.write(msglen >> 8); 326 os.write(msglen >> 8);
365 os.write(msglen); 327 os.write(msglen);
366 os.write(type); 328 os.write(type);
367 329
368 if(type != Packet.SSH_FXP_INIT) { 330 if (type != Packet.SSH_FXP_INIT) {
369 os.write(requestId >> 24); 331 os.write(requestId >> 24);
370 os.write(requestId >> 16); 332 os.write(requestId >> 16);
371 os.write(requestId >> 8); 333 os.write(requestId >> 8);
372 os.write(requestId); 334 os.write(requestId);
373 } 335 }
379 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException { 341 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException {
380 sendMessage(type, requestId, msg, 0, msg.length); 342 sendMessage(type, requestId, msg, 0, msg.length);
381 } 343 }
382 344
383 private void readBytes(byte[] buff, int pos, int len) throws IOException { 345 private void readBytes(byte[] buff, int pos, int len) throws IOException {
384 while(len > 0) { 346 while (len > 0) {
385 int count = is.read(buff, pos, len); 347 int count = is.read(buff, pos, len);
386 if(count < 0) { 348
349 if (count < 0) {
387 throw new SocketException("Unexpected end of stream."); 350 throw new SocketException("Unexpected end of stream.");
388 } 351 }
352
389 len -= count; 353 len -= count;
390 pos += count; 354 pos += count;
391 } 355 }
392 } 356 }
393 357
402 * @return the message contents 366 * @return the message contents
403 * @throws IOException 367 * @throws IOException
404 */ 368 */
405 protected byte[] receiveMessage(int maxlen) throws IOException { 369 protected byte[] receiveMessage(int maxlen) throws IOException {
406 byte[] msglen = new byte[4]; 370 byte[] msglen = new byte[4];
407
408 readBytes(msglen, 0, 4); 371 readBytes(msglen, 0, 4);
409
410 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff)); 372 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff));
411 373
412 if((len > maxlen) || (len <= 0)) { 374 if ((len > maxlen) || (len <= 0)) {
413 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len)); 375 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len));
414 } 376 }
415 377
416 byte[] msg = new byte[len]; 378 byte[] msg = new byte[len];
417
418 readBytes(msg, 0, len); 379 readBytes(msg, 0, len);
419
420 return msg; 380 return msg;
421 } 381 }
422 382
423 protected int generateNextRequestID() { 383 protected int generateNextRequestID() {
424 synchronized(this) { 384 synchronized (this) {
425 return next_request_id++; 385 return next_request_id++;
426 } 386 }
427 } 387 }
428 388
429 protected void closeHandle(byte[] handle) throws IOException { 389 protected void closeHandle(byte[] handle) throws IOException {
430 int req_id = generateNextRequestID(); 390 int req_id = generateNextRequestID();
431
432 TypesWriter tw = new TypesWriter(); 391 TypesWriter tw = new TypesWriter();
433 tw.writeString(handle, 0, handle.length); 392 tw.writeString(handle, 0, handle.length);
434
435 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes()); 393 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes());
436
437 expectStatusOKMessage(req_id); 394 expectStatusOKMessage(req_id);
438 } 395 }
439 396
440 private void readStatus() throws IOException { 397 private void readStatus() throws IOException {
441 byte[] resp = receiveMessage(34000); 398 byte[] resp = receiveMessage(34000);
442
443 TypesReader tr = new TypesReader(resp); 399 TypesReader tr = new TypesReader(resp);
444 int t = tr.readByte(); 400 int t = tr.readByte();
445 listener.read(Packet.forName(t)); 401 listener.read(Packet.forName(t));
446
447 // Search the pending queue 402 // Search the pending queue
448 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); 403 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
449 if(null == status) { 404
405 if (null == status) {
450 throw new RequestMismatchException(); 406 throw new RequestMismatchException();
451 } 407 }
452 408
453 // Evaluate the answer 409 // Evaluate the answer
454 if(t == Packet.SSH_FXP_STATUS) { 410 if (t == Packet.SSH_FXP_STATUS) {
455 // In any case, stop sending more packets 411 // In any case, stop sending more packets
456 int code = tr.readUINT32(); 412 int code = tr.readUINT32();
457 if(log.isDebugEnabled()) { 413
414 if (log.isDebugEnabled()) {
458 String[] desc = ErrorCodes.getDescription(code); 415 String[] desc = ErrorCodes.getDescription(code);
459 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); 416 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
460 } 417 }
461 if(code == ErrorCodes.SSH_FX_OK) { 418
419 if (code == ErrorCodes.SSH_FX_OK) {
462 return; 420 return;
463 } 421 }
422
464 String msg = tr.readString(); 423 String msg = tr.readString();
465 listener.read(msg); 424 listener.read(msg);
466 throw new SFTPException(msg, code); 425 throw new SFTPException(msg, code);
467 } 426 }
427
468 throw new PacketTypeException(t); 428 throw new PacketTypeException(t);
469 } 429 }
470 430
471 private void readPendingReadStatus() throws IOException { 431 private void readPendingReadStatus() throws IOException {
472 byte[] resp = receiveMessage(34000); 432 byte[] resp = receiveMessage(34000);
473
474 TypesReader tr = new TypesReader(resp); 433 TypesReader tr = new TypesReader(resp);
475 int t = tr.readByte(); 434 int t = tr.readByte();
476 listener.read(Packet.forName(t)); 435 listener.read(Packet.forName(t));
477
478 // Search the pending queue 436 // Search the pending queue
479 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32()); 437 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32());
480 if(null == status) { 438
439 if (null == status) {
481 throw new RequestMismatchException(); 440 throw new RequestMismatchException();
482 } 441 }
483 442
484 // Evaluate the answer 443 // Evaluate the answer
485 if(t == Packet.SSH_FXP_STATUS) { 444 if (t == Packet.SSH_FXP_STATUS) {
486 // In any case, stop sending more packets 445 // In any case, stop sending more packets
487 int code = tr.readUINT32(); 446 int code = tr.readUINT32();
488 if(log.isDebugEnabled()) { 447
448 if (log.isDebugEnabled()) {
489 String[] desc = ErrorCodes.getDescription(code); 449 String[] desc = ErrorCodes.getDescription(code);
490 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); 450 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
491 } 451 }
492 if(code == ErrorCodes.SSH_FX_OK) { 452
453 if (code == ErrorCodes.SSH_FX_OK) {
493 return; 454 return;
494 } 455 }
495 if(code == ErrorCodes.SSH_FX_EOF) { 456
457 if (code == ErrorCodes.SSH_FX_EOF) {
496 return; 458 return;
497 } 459 }
460
498 String msg = tr.readString(); 461 String msg = tr.readString();
499 listener.read(msg); 462 listener.read(msg);
500 throw new SFTPException(msg, code); 463 throw new SFTPException(msg, code);
501 } 464 }
465
502 throw new PacketTypeException(t); 466 throw new PacketTypeException(t);
503 } 467 }
504 468
505 protected void expectStatusOKMessage(int id) throws IOException { 469 protected void expectStatusOKMessage(int id) throws IOException {
506 byte[] resp = receiveMessage(34000); 470 byte[] resp = receiveMessage(34000);
507
508 TypesReader tr = new TypesReader(resp); 471 TypesReader tr = new TypesReader(resp);
509
510 int t = tr.readByte(); 472 int t = tr.readByte();
511 listener.read(Packet.forName(t)); 473 listener.read(Packet.forName(t));
512
513 int rep_id = tr.readUINT32(); 474 int rep_id = tr.readUINT32();
514 if(rep_id != id) { 475
476 if (rep_id != id) {
515 throw new RequestMismatchException(); 477 throw new RequestMismatchException();
516 } 478 }
517 479
518 if(t != Packet.SSH_FXP_STATUS) { 480 if (t != Packet.SSH_FXP_STATUS) {
519 throw new PacketTypeException(t); 481 throw new PacketTypeException(t);
520 } 482 }
521 483
522 int errorCode = tr.readUINT32(); 484 int errorCode = tr.readUINT32();
523 485
524 if(errorCode == ErrorCodes.SSH_FX_OK) { 486 if (errorCode == ErrorCodes.SSH_FX_OK) {
525 return; 487 return;
526 } 488 }
489
527 String errorMessage = tr.readString(); 490 String errorMessage = tr.readString();
528 listener.read(errorMessage); 491 listener.read(errorMessage);
529 throw new SFTPException(errorMessage, errorCode); 492 throw new SFTPException(errorMessage, errorCode);
530 } 493 }
531 494
532 public void closeFile(SFTPFileHandle handle) throws IOException { 495 public void closeFile(SFTPFileHandle handle) throws IOException {
533 while(!pendingReadQueue.isEmpty()) { 496 while (!pendingReadQueue.isEmpty()) {
534 this.readPendingReadStatus(); 497 this.readPendingReadStatus();
535 } 498 }
536 while(!pendingStatusQueue.isEmpty()) { 499
500 while (!pendingStatusQueue.isEmpty()) {
537 this.readStatus(); 501 this.readStatus();
538 } 502 }
503
539 closeHandle(handle.getHandle()); 504 closeHandle(handle.getHandle());
540 } 505 }
541 506
542 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException { 507 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException {
543 boolean errorOccured = false; 508 boolean errorOccured = false;
544 int remaining = len * parallelism; 509 int remaining = len * parallelism;
545 //int clientOffset = dstoff; 510 //int clientOffset = dstoff;
546
547 long serverOffset = fileOffset; 511 long serverOffset = fileOffset;
548 for(OutstandingReadRequest r : pendingReadQueue.values()) { 512
513 for (OutstandingReadRequest r : pendingReadQueue.values()) {
549 // Server offset should take pending requests into account. 514 // Server offset should take pending requests into account.
550 serverOffset += r.len; 515 serverOffset += r.len;
551 } 516 }
552 517
553 while(true) { 518 while (true) {
554 // Stop if there was an error and no outstanding request 519 // Stop if there was an error and no outstanding request
555 if((pendingReadQueue.size() == 0) && errorOccured) { 520 if ((pendingReadQueue.size() == 0) && errorOccured) {
556 break; 521 break;
557 } 522 }
558 523
559 // Send as many requests as we are allowed to 524 // Send as many requests as we are allowed to
560 while(pendingReadQueue.size() < parallelism) { 525 while (pendingReadQueue.size() < parallelism) {
561 if(errorOccured) { 526 if (errorOccured) {
562 break; 527 break;
563 } 528 }
529
564 // Send the next read request 530 // Send the next read request
565 OutstandingReadRequest req = new OutstandingReadRequest(); 531 OutstandingReadRequest req = new OutstandingReadRequest();
566 req.req_id = generateNextRequestID(); 532 req.req_id = generateNextRequestID();
567 req.serverOffset = serverOffset; 533 req.serverOffset = serverOffset;
568 req.len = (remaining > len) ? len : remaining; 534 req.len = (remaining > len) ? len : remaining;
569 req.buffer = dst; 535 req.buffer = dst;
570 req.dstOffset = dstoff; 536 req.dstOffset = dstoff;
571
572 serverOffset += req.len; 537 serverOffset += req.len;
573 //clientOffset += req.len; 538 //clientOffset += req.len;
574 remaining -= req.len; 539 remaining -= req.len;
575
576 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); 540 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
577
578 pendingReadQueue.put(req.req_id, req); 541 pendingReadQueue.put(req.req_id, req);
579 } 542 }
580 if(pendingReadQueue.size() == 0) { 543
544 if (pendingReadQueue.size() == 0) {
581 break; 545 break;
582 } 546 }
583 547
584 // Receive a single answer 548 // Receive a single answer
585 byte[] resp = receiveMessage(34000); 549 byte[] resp = receiveMessage(34000);
586 TypesReader tr = new TypesReader(resp); 550 TypesReader tr = new TypesReader(resp);
587
588 int t = tr.readByte(); 551 int t = tr.readByte();
589 listener.read(Packet.forName(t)); 552 listener.read(Packet.forName(t));
590
591 // Search the pending queue 553 // Search the pending queue
592 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); 554 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
593 if(null == req) { 555
556 if (null == req) {
594 throw new RequestMismatchException(); 557 throw new RequestMismatchException();
595 } 558 }
559
596 // Evaluate the answer 560 // Evaluate the answer
597 if(t == Packet.SSH_FXP_STATUS) { 561 if (t == Packet.SSH_FXP_STATUS) {
598 /* In any case, stop sending more packets */ 562 /* In any case, stop sending more packets */
599
600 int code = tr.readUINT32(); 563 int code = tr.readUINT32();
601 String msg = tr.readString(); 564 String msg = tr.readString();
602 listener.read(msg); 565 listener.read(msg);
603 566
604 if(log.isDebugEnabled()) { 567 if (log.isDebugEnabled()) {
605 String[] desc = ErrorCodes.getDescription(code); 568 String[] desc = ErrorCodes.getDescription(code);
606 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); 569 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
607 } 570 }
571
608 // Flag to read all pending requests but don't send any more. 572 // Flag to read all pending requests but don't send any more.
609 errorOccured = true; 573 errorOccured = true;
610 if(pendingReadQueue.isEmpty()) { 574
611 if(ErrorCodes.SSH_FX_EOF == code) { 575 if (pendingReadQueue.isEmpty()) {
576 if (ErrorCodes.SSH_FX_EOF == code) {
612 return -1; 577 return -1;
613 } 578 }
579
614 throw new SFTPException(msg, code); 580 throw new SFTPException(msg, code);
615 } 581 }
616 } 582 }
617 else if(t == Packet.SSH_FXP_DATA) { 583 else if (t == Packet.SSH_FXP_DATA) {
618 // OK, collect data 584 // OK, collect data
619 int readLen = tr.readUINT32(); 585 int readLen = tr.readUINT32();
620 586
621 if((readLen < 0) || (readLen > req.len)) { 587 if ((readLen < 0) || (readLen > req.len)) {
622 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet."); 588 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet.");
623 } 589 }
624 590
625 if(log.isDebugEnabled()) { 591 if (log.isDebugEnabled()) {
626 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen 592 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen
627 + " (requested: " + req.len + ")"); 593 + " (requested: " + req.len + ")");
628 } 594 }
629 595
630 // Read bytes into buffer 596 // Read bytes into buffer
631 tr.readBytes(req.buffer, req.dstOffset, readLen); 597 tr.readBytes(req.buffer, req.dstOffset, readLen);
632 598
633 if(readLen < req.len) { 599 if (readLen < req.len) {
634 /* Send this request packet again to request the remaing data in this slot. */ 600 /* Send this request packet again to request the remaing data in this slot. */
635 req.req_id = generateNextRequestID(); 601 req.req_id = generateNextRequestID();
636 req.serverOffset += readLen; 602 req.serverOffset += readLen;
637 req.len -= readLen; 603 req.len -= readLen;
638
639 log.debug("Requesting again: " + req.serverOffset + "/" + req.len); 604 log.debug("Requesting again: " + req.serverOffset + "/" + req.len);
640 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); 605 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
641
642 pendingReadQueue.put(req.req_id, req); 606 pendingReadQueue.put(req.req_id, req);
643 } 607 }
608
644 return readLen; 609 return readLen;
645 } 610 }
646 else { 611 else {
647 throw new PacketTypeException(t); 612 throw new PacketTypeException(t);
648 } 613 }
649 } 614 }
615
650 // Should never reach here. 616 // Should never reach here.
651 throw new SFTPException("No EOF reached", -1); 617 throw new SFTPException("No EOF reached", -1);
652 } 618 }
653 619
654 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException { 620 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException {
655 TypesWriter tw = new TypesWriter(); 621 TypesWriter tw = new TypesWriter();
656 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); 622 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
657 tw.writeUINT64(offset); 623 tw.writeUINT64(offset);
658 tw.writeUINT32(len); 624 tw.writeUINT32(len);
659
660 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes()); 625 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes());
661 } 626 }
662 627
663 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { 628 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
664 while(len > 0) { 629 while (len > 0) {
665 int writeRequestLen = len; 630 int writeRequestLen = len;
666 631
667 if(writeRequestLen > 32768) { 632 if (writeRequestLen > 32768) {
668 writeRequestLen = 32768; 633 writeRequestLen = 32768;
669 } 634 }
670 635
671 // Send the next write request 636 // Send the next write request
672 OutstandingStatusRequest req = new OutstandingStatusRequest(); 637 OutstandingStatusRequest req = new OutstandingStatusRequest();
673 req.req_id = generateNextRequestID(); 638 req.req_id = generateNextRequestID();
674
675 TypesWriter tw = new TypesWriter(); 639 TypesWriter tw = new TypesWriter();
676 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); 640 tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
677 tw.writeUINT64(fileOffset); 641 tw.writeUINT64(fileOffset);
678 tw.writeString(src, srcoff, writeRequestLen); 642 tw.writeString(src, srcoff, writeRequestLen);
679
680 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); 643 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
681
682 pendingStatusQueue.put(req.req_id, req); 644 pendingStatusQueue.put(req.req_id, req);
683 645
684 // Only read next status if parallelism reached 646 // Only read next status if parallelism reached
685 while(pendingStatusQueue.size() >= parallelism) { 647 while (pendingStatusQueue.size() >= parallelism) {
686 this.readStatus(); 648 this.readStatus();
687 } 649 }
650
688 fileOffset += writeRequestLen; 651 fileOffset += writeRequestLen;
689 srcoff += writeRequestLen; 652 srcoff += writeRequestLen;
690 len -= writeRequestLen; 653 len -= writeRequestLen;
691 } 654 }
692 } 655 }