Mercurial > 510Connectbot
comparison src/ch/ethz/ssh2/AbstractSFTPClient.java @ 308:42b15aaa7ac7 ganymed
merge
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Wed, 30 Jul 2014 14:21:50 -0700 |
parents | 071eccdff8ea |
children |
comparison
equal
deleted
inserted
replaced
306:90e47d99ea54 | 308:42b15aaa7ac7 |
---|---|
51 | 51 |
52 /** | 52 /** |
53 * Mapping request ID to request. | 53 * Mapping request ID to request. |
54 */ | 54 */ |
55 private Map<Integer, OutstandingReadRequest> pendingReadQueue | 55 private Map<Integer, OutstandingReadRequest> pendingReadQueue |
56 = new HashMap<Integer, OutstandingReadRequest>(); | 56 = new HashMap<Integer, OutstandingReadRequest>(); |
57 | 57 |
58 /** | 58 /** |
59 * Mapping request ID to request. | 59 * Mapping request ID to request. |
60 */ | 60 */ |
61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue | 61 private Map<Integer, OutstandingStatusRequest> pendingStatusQueue |
62 = new HashMap<Integer, OutstandingStatusRequest>(); | 62 = new HashMap<Integer, OutstandingStatusRequest>(); |
63 | 63 |
64 private PacketListener listener; | 64 private PacketListener listener; |
65 | 65 |
66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException { | 66 protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException { |
67 this.listener = listener; | 67 this.listener = listener; |
68 | |
69 log.debug("Opening session and starting SFTP subsystem."); | 68 log.debug("Opening session and starting SFTP subsystem."); |
70 sess = conn.openSession(); | 69 sess = conn.openSession(); |
71 sess.startSubSystem("sftp"); | 70 sess.startSubSystem("sftp"); |
72 | |
73 is = sess.getStdout(); | 71 is = sess.getStdout(); |
74 os = new BufferedOutputStream(sess.getStdin(), 2048); | 72 os = new BufferedOutputStream(sess.getStdin(), 2048); |
75 | |
76 init(version); | 73 init(version); |
77 | |
78 } | 74 } |
79 | 75 |
80 private void init(final int client_version) throws IOException { | 76 private void init(final int client_version) throws IOException { |
81 // Send SSH_FXP_INIT with client version | 77 // Send SSH_FXP_INIT with client version |
82 | |
83 TypesWriter tw = new TypesWriter(); | 78 TypesWriter tw = new TypesWriter(); |
84 tw.writeUINT32(client_version); | 79 tw.writeUINT32(client_version); |
85 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes()); | 80 sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes()); |
86 | 81 /* Receive SSH_FXP_VERSION */ |
87 /* Receive SSH_FXP_VERSION */ | |
88 | |
89 log.debug("Waiting for SSH_FXP_VERSION..."); | 82 log.debug("Waiting for SSH_FXP_VERSION..."); |
90 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */ | 83 TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */ |
91 | |
92 int t = tr.readByte(); | 84 int t = tr.readByte(); |
93 listener.read(Packet.forName(t)); | 85 listener.read(Packet.forName(t)); |
94 | 86 |
95 if(t != Packet.SSH_FXP_VERSION) { | 87 if (t != Packet.SSH_FXP_VERSION) { |
96 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t)); | 88 log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t)); |
97 throw new PacketTypeException(t); | 89 throw new PacketTypeException(t); |
98 } | 90 } |
99 | 91 |
100 final int protocol_version = tr.readUINT32(); | 92 final int protocol_version = tr.readUINT32(); |
101 | |
102 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version); | 93 log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version); |
103 if(protocol_version != client_version) { | 94 |
95 if (protocol_version != client_version) { | |
104 throw new IOException(String.format("Server protocol version %d does not match %d", | 96 throw new IOException(String.format("Server protocol version %d does not match %d", |
105 protocol_version, client_version)); | 97 protocol_version, client_version)); |
106 } | 98 } |
99 | |
107 // Both parties should from then on adhere to particular version of the protocol | 100 // Both parties should from then on adhere to particular version of the protocol |
108 | 101 |
109 // Read and save extensions (if any) for later use | 102 // Read and save extensions (if any) for later use |
110 while(tr.remain() != 0) { | 103 while (tr.remain() != 0) { |
111 String name = tr.readString(); | 104 String name = tr.readString(); |
112 listener.read(name); | 105 listener.read(name); |
113 byte[] value = tr.readByteString(); | 106 byte[] value = tr.readByteString(); |
114 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value))); | 107 log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value))); |
115 } | 108 } |
143 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8. | 136 * @param charset the name of the charset to be used or <code>null</code> to use UTF-8. |
144 * @throws java.io.IOException | 137 * @throws java.io.IOException |
145 * @see #getCharset() | 138 * @see #getCharset() |
146 */ | 139 */ |
147 public void setCharset(String charset) throws IOException { | 140 public void setCharset(String charset) throws IOException { |
148 if(charset == null) { | 141 if (charset == null) { |
149 this.charset = null; | 142 this.charset = null; |
150 return; | 143 return; |
151 } | 144 } |
145 | |
152 try { | 146 try { |
153 Charset.forName(charset); | 147 Charset.forName(charset); |
154 } | 148 } |
155 catch(UnsupportedCharsetException e) { | 149 catch (UnsupportedCharsetException e) { |
156 throw new IOException("This charset is not supported", e); | 150 throw new IOException("This charset is not supported", e); |
157 } | 151 } |
152 | |
158 this.charset = charset; | 153 this.charset = charset; |
159 } | 154 } |
160 | 155 |
161 /** | 156 /** |
162 * The currently used charset for filename encoding/decoding. | 157 * The currently used charset for filename encoding/decoding. |
170 | 165 |
171 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException; | 166 public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException; |
172 | 167 |
173 public void mkdir(String dirName, int posixPermissions) throws IOException { | 168 public void mkdir(String dirName, int posixPermissions) throws IOException { |
174 int req_id = generateNextRequestID(); | 169 int req_id = generateNextRequestID(); |
175 | |
176 TypesWriter tw = new TypesWriter(); | 170 TypesWriter tw = new TypesWriter(); |
177 tw.writeString(dirName, this.getCharset()); | 171 tw.writeString(dirName, this.getCharset()); |
178 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS); | 172 tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS); |
179 tw.writeUINT32(posixPermissions); | 173 tw.writeUINT32(posixPermissions); |
180 | |
181 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes()); | 174 sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes()); |
182 | |
183 expectStatusOKMessage(req_id); | 175 expectStatusOKMessage(req_id); |
184 } | 176 } |
185 | 177 |
186 public void rm(String fileName) throws IOException { | 178 public void rm(String fileName) throws IOException { |
187 int req_id = generateNextRequestID(); | 179 int req_id = generateNextRequestID(); |
188 | |
189 TypesWriter tw = new TypesWriter(); | 180 TypesWriter tw = new TypesWriter(); |
190 tw.writeString(fileName, this.getCharset()); | 181 tw.writeString(fileName, this.getCharset()); |
191 | |
192 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes()); | 182 sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes()); |
193 | |
194 expectStatusOKMessage(req_id); | 183 expectStatusOKMessage(req_id); |
195 } | 184 } |
196 | 185 |
197 public void rmdir(String dirName) throws IOException { | 186 public void rmdir(String dirName) throws IOException { |
198 int req_id = generateNextRequestID(); | 187 int req_id = generateNextRequestID(); |
199 | |
200 TypesWriter tw = new TypesWriter(); | 188 TypesWriter tw = new TypesWriter(); |
201 tw.writeString(dirName, this.getCharset()); | 189 tw.writeString(dirName, this.getCharset()); |
202 | |
203 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes()); | 190 sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes()); |
204 | |
205 expectStatusOKMessage(req_id); | 191 expectStatusOKMessage(req_id); |
206 } | 192 } |
207 | 193 |
208 public void mv(String oldPath, String newPath) throws IOException { | 194 public void mv(String oldPath, String newPath) throws IOException { |
209 int req_id = generateNextRequestID(); | 195 int req_id = generateNextRequestID(); |
210 | |
211 TypesWriter tw = new TypesWriter(); | 196 TypesWriter tw = new TypesWriter(); |
212 tw.writeString(oldPath, this.getCharset()); | 197 tw.writeString(oldPath, this.getCharset()); |
213 tw.writeString(newPath, this.getCharset()); | 198 tw.writeString(newPath, this.getCharset()); |
214 | |
215 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes()); | 199 sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes()); |
216 | |
217 expectStatusOKMessage(req_id); | 200 expectStatusOKMessage(req_id); |
218 } | 201 } |
219 | 202 |
220 public String readLink(String path) throws IOException { | 203 public String readLink(String path) throws IOException { |
221 int req_id = generateNextRequestID(); | 204 int req_id = generateNextRequestID(); |
222 | |
223 TypesWriter tw = new TypesWriter(); | 205 TypesWriter tw = new TypesWriter(); |
224 tw.writeString(path, charset); | 206 tw.writeString(path, charset); |
225 | |
226 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes()); | 207 sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes()); |
227 | |
228 byte[] resp = receiveMessage(34000); | 208 byte[] resp = receiveMessage(34000); |
229 | |
230 TypesReader tr = new TypesReader(resp); | 209 TypesReader tr = new TypesReader(resp); |
231 | |
232 int t = tr.readByte(); | 210 int t = tr.readByte(); |
233 listener.read(Packet.forName(t)); | 211 listener.read(Packet.forName(t)); |
234 | |
235 int rep_id = tr.readUINT32(); | 212 int rep_id = tr.readUINT32(); |
236 if(rep_id != req_id) { | 213 |
214 if (rep_id != req_id) { | |
237 throw new RequestMismatchException(); | 215 throw new RequestMismatchException(); |
238 } | 216 } |
239 | 217 |
240 if(t == Packet.SSH_FXP_NAME) { | 218 if (t == Packet.SSH_FXP_NAME) { |
241 int count = tr.readUINT32(); | 219 int count = tr.readUINT32(); |
242 | 220 |
243 if(count != 1) { | 221 if (count != 1) { |
244 throw new PacketTypeException(t); | 222 throw new PacketTypeException(t); |
245 } | 223 } |
246 | 224 |
247 return tr.readString(charset); | 225 return tr.readString(charset); |
248 } | 226 } |
249 | 227 |
250 if(t != Packet.SSH_FXP_STATUS) { | 228 if (t != Packet.SSH_FXP_STATUS) { |
251 throw new PacketTypeException(t); | 229 throw new PacketTypeException(t); |
252 } | 230 } |
253 | 231 |
254 int errorCode = tr.readUINT32(); | 232 int errorCode = tr.readUINT32(); |
255 String errorMessage = tr.readString(); | 233 String errorMessage = tr.readString(); |
257 throw new SFTPException(errorMessage, errorCode); | 235 throw new SFTPException(errorMessage, errorCode); |
258 } | 236 } |
259 | 237 |
260 public void setstat(String path, SFTPFileAttributes attr) throws IOException { | 238 public void setstat(String path, SFTPFileAttributes attr) throws IOException { |
261 int req_id = generateNextRequestID(); | 239 int req_id = generateNextRequestID(); |
262 | |
263 TypesWriter tw = new TypesWriter(); | 240 TypesWriter tw = new TypesWriter(); |
264 tw.writeString(path, charset); | 241 tw.writeString(path, charset); |
265 tw.writeBytes(attr.toBytes()); | 242 tw.writeBytes(attr.toBytes()); |
266 | |
267 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes()); | 243 sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes()); |
268 | |
269 expectStatusOKMessage(req_id); | 244 expectStatusOKMessage(req_id); |
270 } | 245 } |
271 | 246 |
272 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException { | 247 public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException { |
273 int req_id = generateNextRequestID(); | 248 int req_id = generateNextRequestID(); |
274 | |
275 TypesWriter tw = new TypesWriter(); | 249 TypesWriter tw = new TypesWriter(); |
276 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | 250 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); |
277 tw.writeBytes(attr.toBytes()); | 251 tw.writeBytes(attr.toBytes()); |
278 | |
279 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes()); | 252 sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes()); |
280 | |
281 expectStatusOKMessage(req_id); | 253 expectStatusOKMessage(req_id); |
282 } | 254 } |
283 | 255 |
284 public void createSymlink(String src, String target) throws IOException { | 256 public void createSymlink(String src, String target) throws IOException { |
285 int req_id = generateNextRequestID(); | 257 int req_id = generateNextRequestID(); |
286 | |
287 TypesWriter tw = new TypesWriter(); | 258 TypesWriter tw = new TypesWriter(); |
288 tw.writeString(src, charset); | 259 tw.writeString(src, charset); |
289 tw.writeString(target, charset); | 260 tw.writeString(target, charset); |
290 | |
291 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes()); | 261 sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes()); |
292 | |
293 expectStatusOKMessage(req_id); | 262 expectStatusOKMessage(req_id); |
294 } | 263 } |
295 | 264 |
296 public void createHardlink(String src, String target) throws IOException { | 265 public void createHardlink(String src, String target) throws IOException { |
297 int req_id = generateNextRequestID(); | 266 int req_id = generateNextRequestID(); |
298 | |
299 TypesWriter tw = new TypesWriter(); | 267 TypesWriter tw = new TypesWriter(); |
300 tw.writeString("hardlink@openssh.com", charset); | 268 tw.writeString("hardlink@openssh.com", charset); |
301 tw.writeString(target, charset); | 269 tw.writeString(target, charset); |
302 tw.writeString(src, charset); | 270 tw.writeString(src, charset); |
303 | |
304 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes()); | 271 sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes()); |
305 | |
306 expectStatusOKMessage(req_id); | 272 expectStatusOKMessage(req_id); |
307 } | 273 } |
308 | 274 |
309 public String canonicalPath(String path) throws IOException { | 275 public String canonicalPath(String path) throws IOException { |
310 int req_id = generateNextRequestID(); | 276 int req_id = generateNextRequestID(); |
311 | |
312 TypesWriter tw = new TypesWriter(); | 277 TypesWriter tw = new TypesWriter(); |
313 tw.writeString(path, charset); | 278 tw.writeString(path, charset); |
314 | |
315 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes()); | 279 sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes()); |
316 | |
317 byte[] resp = receiveMessage(34000); | 280 byte[] resp = receiveMessage(34000); |
318 | |
319 TypesReader tr = new TypesReader(resp); | 281 TypesReader tr = new TypesReader(resp); |
320 | |
321 int t = tr.readByte(); | 282 int t = tr.readByte(); |
322 listener.read(Packet.forName(t)); | 283 listener.read(Packet.forName(t)); |
323 | |
324 int rep_id = tr.readUINT32(); | 284 int rep_id = tr.readUINT32(); |
325 if(rep_id != req_id) { | 285 |
286 if (rep_id != req_id) { | |
326 throw new RequestMismatchException(); | 287 throw new RequestMismatchException(); |
327 } | 288 } |
328 | 289 |
329 if(t == Packet.SSH_FXP_NAME) { | 290 if (t == Packet.SSH_FXP_NAME) { |
330 int count = tr.readUINT32(); | 291 int count = tr.readUINT32(); |
331 | 292 |
332 if(count != 1) { | 293 if (count != 1) { |
333 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet."); | 294 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet."); |
334 } | 295 } |
296 | |
335 final String name = tr.readString(charset); | 297 final String name = tr.readString(charset); |
336 listener.read(name); | 298 listener.read(name); |
337 return name; | 299 return name; |
338 } | 300 } |
339 | 301 |
340 if(t != Packet.SSH_FXP_STATUS) { | 302 if (t != Packet.SSH_FXP_STATUS) { |
341 throw new PacketTypeException(t); | 303 throw new PacketTypeException(t); |
342 } | 304 } |
343 | 305 |
344 int errorCode = tr.readUINT32(); | 306 int errorCode = tr.readUINT32(); |
345 String errorMessage = tr.readString(); | 307 String errorMessage = tr.readString(); |
346 listener.read(errorMessage); | 308 listener.read(errorMessage); |
347 throw new SFTPException(errorMessage, errorCode); | 309 throw new SFTPException(errorMessage, errorCode); |
348 } | 310 } |
349 | 311 |
350 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException { | 312 private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException { |
351 if(log.isDebugEnabled()) { | 313 if (log.isDebugEnabled()) { |
352 log.debug(String.format("Send message of type %d with request id %d", type, requestId)); | 314 log.debug(String.format("Send message of type %d with request id %d", type, requestId)); |
353 } | 315 } |
316 | |
354 listener.write(Packet.forName(type)); | 317 listener.write(Packet.forName(type)); |
355 | |
356 int msglen = len + 1; | 318 int msglen = len + 1; |
357 | 319 |
358 if(type != Packet.SSH_FXP_INIT) { | 320 if (type != Packet.SSH_FXP_INIT) { |
359 msglen += 4; | 321 msglen += 4; |
360 } | 322 } |
361 | 323 |
362 os.write(msglen >> 24); | 324 os.write(msglen >> 24); |
363 os.write(msglen >> 16); | 325 os.write(msglen >> 16); |
364 os.write(msglen >> 8); | 326 os.write(msglen >> 8); |
365 os.write(msglen); | 327 os.write(msglen); |
366 os.write(type); | 328 os.write(type); |
367 | 329 |
368 if(type != Packet.SSH_FXP_INIT) { | 330 if (type != Packet.SSH_FXP_INIT) { |
369 os.write(requestId >> 24); | 331 os.write(requestId >> 24); |
370 os.write(requestId >> 16); | 332 os.write(requestId >> 16); |
371 os.write(requestId >> 8); | 333 os.write(requestId >> 8); |
372 os.write(requestId); | 334 os.write(requestId); |
373 } | 335 } |
379 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException { | 341 protected void sendMessage(int type, int requestId, byte[] msg) throws IOException { |
380 sendMessage(type, requestId, msg, 0, msg.length); | 342 sendMessage(type, requestId, msg, 0, msg.length); |
381 } | 343 } |
382 | 344 |
383 private void readBytes(byte[] buff, int pos, int len) throws IOException { | 345 private void readBytes(byte[] buff, int pos, int len) throws IOException { |
384 while(len > 0) { | 346 while (len > 0) { |
385 int count = is.read(buff, pos, len); | 347 int count = is.read(buff, pos, len); |
386 if(count < 0) { | 348 |
349 if (count < 0) { | |
387 throw new SocketException("Unexpected end of stream."); | 350 throw new SocketException("Unexpected end of stream."); |
388 } | 351 } |
352 | |
389 len -= count; | 353 len -= count; |
390 pos += count; | 354 pos += count; |
391 } | 355 } |
392 } | 356 } |
393 | 357 |
402 * @return the message contents | 366 * @return the message contents |
403 * @throws IOException | 367 * @throws IOException |
404 */ | 368 */ |
405 protected byte[] receiveMessage(int maxlen) throws IOException { | 369 protected byte[] receiveMessage(int maxlen) throws IOException { |
406 byte[] msglen = new byte[4]; | 370 byte[] msglen = new byte[4]; |
407 | |
408 readBytes(msglen, 0, 4); | 371 readBytes(msglen, 0, 4); |
409 | |
410 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff)); | 372 int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff)); |
411 | 373 |
412 if((len > maxlen) || (len <= 0)) { | 374 if ((len > maxlen) || (len <= 0)) { |
413 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len)); | 375 throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len)); |
414 } | 376 } |
415 | 377 |
416 byte[] msg = new byte[len]; | 378 byte[] msg = new byte[len]; |
417 | |
418 readBytes(msg, 0, len); | 379 readBytes(msg, 0, len); |
419 | |
420 return msg; | 380 return msg; |
421 } | 381 } |
422 | 382 |
423 protected int generateNextRequestID() { | 383 protected int generateNextRequestID() { |
424 synchronized(this) { | 384 synchronized (this) { |
425 return next_request_id++; | 385 return next_request_id++; |
426 } | 386 } |
427 } | 387 } |
428 | 388 |
429 protected void closeHandle(byte[] handle) throws IOException { | 389 protected void closeHandle(byte[] handle) throws IOException { |
430 int req_id = generateNextRequestID(); | 390 int req_id = generateNextRequestID(); |
431 | |
432 TypesWriter tw = new TypesWriter(); | 391 TypesWriter tw = new TypesWriter(); |
433 tw.writeString(handle, 0, handle.length); | 392 tw.writeString(handle, 0, handle.length); |
434 | |
435 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes()); | 393 sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes()); |
436 | |
437 expectStatusOKMessage(req_id); | 394 expectStatusOKMessage(req_id); |
438 } | 395 } |
439 | 396 |
440 private void readStatus() throws IOException { | 397 private void readStatus() throws IOException { |
441 byte[] resp = receiveMessage(34000); | 398 byte[] resp = receiveMessage(34000); |
442 | |
443 TypesReader tr = new TypesReader(resp); | 399 TypesReader tr = new TypesReader(resp); |
444 int t = tr.readByte(); | 400 int t = tr.readByte(); |
445 listener.read(Packet.forName(t)); | 401 listener.read(Packet.forName(t)); |
446 | |
447 // Search the pending queue | 402 // Search the pending queue |
448 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); | 403 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); |
449 if(null == status) { | 404 |
405 if (null == status) { | |
450 throw new RequestMismatchException(); | 406 throw new RequestMismatchException(); |
451 } | 407 } |
452 | 408 |
453 // Evaluate the answer | 409 // Evaluate the answer |
454 if(t == Packet.SSH_FXP_STATUS) { | 410 if (t == Packet.SSH_FXP_STATUS) { |
455 // In any case, stop sending more packets | 411 // In any case, stop sending more packets |
456 int code = tr.readUINT32(); | 412 int code = tr.readUINT32(); |
457 if(log.isDebugEnabled()) { | 413 |
414 if (log.isDebugEnabled()) { | |
458 String[] desc = ErrorCodes.getDescription(code); | 415 String[] desc = ErrorCodes.getDescription(code); |
459 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | 416 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); |
460 } | 417 } |
461 if(code == ErrorCodes.SSH_FX_OK) { | 418 |
419 if (code == ErrorCodes.SSH_FX_OK) { | |
462 return; | 420 return; |
463 } | 421 } |
422 | |
464 String msg = tr.readString(); | 423 String msg = tr.readString(); |
465 listener.read(msg); | 424 listener.read(msg); |
466 throw new SFTPException(msg, code); | 425 throw new SFTPException(msg, code); |
467 } | 426 } |
427 | |
468 throw new PacketTypeException(t); | 428 throw new PacketTypeException(t); |
469 } | 429 } |
470 | 430 |
471 private void readPendingReadStatus() throws IOException { | 431 private void readPendingReadStatus() throws IOException { |
472 byte[] resp = receiveMessage(34000); | 432 byte[] resp = receiveMessage(34000); |
473 | |
474 TypesReader tr = new TypesReader(resp); | 433 TypesReader tr = new TypesReader(resp); |
475 int t = tr.readByte(); | 434 int t = tr.readByte(); |
476 listener.read(Packet.forName(t)); | 435 listener.read(Packet.forName(t)); |
477 | |
478 // Search the pending queue | 436 // Search the pending queue |
479 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32()); | 437 OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32()); |
480 if(null == status) { | 438 |
439 if (null == status) { | |
481 throw new RequestMismatchException(); | 440 throw new RequestMismatchException(); |
482 } | 441 } |
483 | 442 |
484 // Evaluate the answer | 443 // Evaluate the answer |
485 if(t == Packet.SSH_FXP_STATUS) { | 444 if (t == Packet.SSH_FXP_STATUS) { |
486 // In any case, stop sending more packets | 445 // In any case, stop sending more packets |
487 int code = tr.readUINT32(); | 446 int code = tr.readUINT32(); |
488 if(log.isDebugEnabled()) { | 447 |
448 if (log.isDebugEnabled()) { | |
489 String[] desc = ErrorCodes.getDescription(code); | 449 String[] desc = ErrorCodes.getDescription(code); |
490 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | 450 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); |
491 } | 451 } |
492 if(code == ErrorCodes.SSH_FX_OK) { | 452 |
453 if (code == ErrorCodes.SSH_FX_OK) { | |
493 return; | 454 return; |
494 } | 455 } |
495 if(code == ErrorCodes.SSH_FX_EOF) { | 456 |
457 if (code == ErrorCodes.SSH_FX_EOF) { | |
496 return; | 458 return; |
497 } | 459 } |
460 | |
498 String msg = tr.readString(); | 461 String msg = tr.readString(); |
499 listener.read(msg); | 462 listener.read(msg); |
500 throw new SFTPException(msg, code); | 463 throw new SFTPException(msg, code); |
501 } | 464 } |
465 | |
502 throw new PacketTypeException(t); | 466 throw new PacketTypeException(t); |
503 } | 467 } |
504 | 468 |
505 protected void expectStatusOKMessage(int id) throws IOException { | 469 protected void expectStatusOKMessage(int id) throws IOException { |
506 byte[] resp = receiveMessage(34000); | 470 byte[] resp = receiveMessage(34000); |
507 | |
508 TypesReader tr = new TypesReader(resp); | 471 TypesReader tr = new TypesReader(resp); |
509 | |
510 int t = tr.readByte(); | 472 int t = tr.readByte(); |
511 listener.read(Packet.forName(t)); | 473 listener.read(Packet.forName(t)); |
512 | |
513 int rep_id = tr.readUINT32(); | 474 int rep_id = tr.readUINT32(); |
514 if(rep_id != id) { | 475 |
476 if (rep_id != id) { | |
515 throw new RequestMismatchException(); | 477 throw new RequestMismatchException(); |
516 } | 478 } |
517 | 479 |
518 if(t != Packet.SSH_FXP_STATUS) { | 480 if (t != Packet.SSH_FXP_STATUS) { |
519 throw new PacketTypeException(t); | 481 throw new PacketTypeException(t); |
520 } | 482 } |
521 | 483 |
522 int errorCode = tr.readUINT32(); | 484 int errorCode = tr.readUINT32(); |
523 | 485 |
524 if(errorCode == ErrorCodes.SSH_FX_OK) { | 486 if (errorCode == ErrorCodes.SSH_FX_OK) { |
525 return; | 487 return; |
526 } | 488 } |
489 | |
527 String errorMessage = tr.readString(); | 490 String errorMessage = tr.readString(); |
528 listener.read(errorMessage); | 491 listener.read(errorMessage); |
529 throw new SFTPException(errorMessage, errorCode); | 492 throw new SFTPException(errorMessage, errorCode); |
530 } | 493 } |
531 | 494 |
532 public void closeFile(SFTPFileHandle handle) throws IOException { | 495 public void closeFile(SFTPFileHandle handle) throws IOException { |
533 while(!pendingReadQueue.isEmpty()) { | 496 while (!pendingReadQueue.isEmpty()) { |
534 this.readPendingReadStatus(); | 497 this.readPendingReadStatus(); |
535 } | 498 } |
536 while(!pendingStatusQueue.isEmpty()) { | 499 |
500 while (!pendingStatusQueue.isEmpty()) { | |
537 this.readStatus(); | 501 this.readStatus(); |
538 } | 502 } |
503 | |
539 closeHandle(handle.getHandle()); | 504 closeHandle(handle.getHandle()); |
540 } | 505 } |
541 | 506 |
542 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException { | 507 public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException { |
543 boolean errorOccured = false; | 508 boolean errorOccured = false; |
544 int remaining = len * parallelism; | 509 int remaining = len * parallelism; |
545 //int clientOffset = dstoff; | 510 //int clientOffset = dstoff; |
546 | |
547 long serverOffset = fileOffset; | 511 long serverOffset = fileOffset; |
548 for(OutstandingReadRequest r : pendingReadQueue.values()) { | 512 |
513 for (OutstandingReadRequest r : pendingReadQueue.values()) { | |
549 // Server offset should take pending requests into account. | 514 // Server offset should take pending requests into account. |
550 serverOffset += r.len; | 515 serverOffset += r.len; |
551 } | 516 } |
552 | 517 |
553 while(true) { | 518 while (true) { |
554 // Stop if there was an error and no outstanding request | 519 // Stop if there was an error and no outstanding request |
555 if((pendingReadQueue.size() == 0) && errorOccured) { | 520 if ((pendingReadQueue.size() == 0) && errorOccured) { |
556 break; | 521 break; |
557 } | 522 } |
558 | 523 |
559 // Send as many requests as we are allowed to | 524 // Send as many requests as we are allowed to |
560 while(pendingReadQueue.size() < parallelism) { | 525 while (pendingReadQueue.size() < parallelism) { |
561 if(errorOccured) { | 526 if (errorOccured) { |
562 break; | 527 break; |
563 } | 528 } |
529 | |
564 // Send the next read request | 530 // Send the next read request |
565 OutstandingReadRequest req = new OutstandingReadRequest(); | 531 OutstandingReadRequest req = new OutstandingReadRequest(); |
566 req.req_id = generateNextRequestID(); | 532 req.req_id = generateNextRequestID(); |
567 req.serverOffset = serverOffset; | 533 req.serverOffset = serverOffset; |
568 req.len = (remaining > len) ? len : remaining; | 534 req.len = (remaining > len) ? len : remaining; |
569 req.buffer = dst; | 535 req.buffer = dst; |
570 req.dstOffset = dstoff; | 536 req.dstOffset = dstoff; |
571 | |
572 serverOffset += req.len; | 537 serverOffset += req.len; |
573 //clientOffset += req.len; | 538 //clientOffset += req.len; |
574 remaining -= req.len; | 539 remaining -= req.len; |
575 | |
576 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | 540 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); |
577 | |
578 pendingReadQueue.put(req.req_id, req); | 541 pendingReadQueue.put(req.req_id, req); |
579 } | 542 } |
580 if(pendingReadQueue.size() == 0) { | 543 |
544 if (pendingReadQueue.size() == 0) { | |
581 break; | 545 break; |
582 } | 546 } |
583 | 547 |
584 // Receive a single answer | 548 // Receive a single answer |
585 byte[] resp = receiveMessage(34000); | 549 byte[] resp = receiveMessage(34000); |
586 TypesReader tr = new TypesReader(resp); | 550 TypesReader tr = new TypesReader(resp); |
587 | |
588 int t = tr.readByte(); | 551 int t = tr.readByte(); |
589 listener.read(Packet.forName(t)); | 552 listener.read(Packet.forName(t)); |
590 | |
591 // Search the pending queue | 553 // Search the pending queue |
592 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); | 554 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); |
593 if(null == req) { | 555 |
556 if (null == req) { | |
594 throw new RequestMismatchException(); | 557 throw new RequestMismatchException(); |
595 } | 558 } |
559 | |
596 // Evaluate the answer | 560 // Evaluate the answer |
597 if(t == Packet.SSH_FXP_STATUS) { | 561 if (t == Packet.SSH_FXP_STATUS) { |
598 /* In any case, stop sending more packets */ | 562 /* In any case, stop sending more packets */ |
599 | |
600 int code = tr.readUINT32(); | 563 int code = tr.readUINT32(); |
601 String msg = tr.readString(); | 564 String msg = tr.readString(); |
602 listener.read(msg); | 565 listener.read(msg); |
603 | 566 |
604 if(log.isDebugEnabled()) { | 567 if (log.isDebugEnabled()) { |
605 String[] desc = ErrorCodes.getDescription(code); | 568 String[] desc = ErrorCodes.getDescription(code); |
606 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); | 569 log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); |
607 } | 570 } |
571 | |
608 // Flag to read all pending requests but don't send any more. | 572 // Flag to read all pending requests but don't send any more. |
609 errorOccured = true; | 573 errorOccured = true; |
610 if(pendingReadQueue.isEmpty()) { | 574 |
611 if(ErrorCodes.SSH_FX_EOF == code) { | 575 if (pendingReadQueue.isEmpty()) { |
576 if (ErrorCodes.SSH_FX_EOF == code) { | |
612 return -1; | 577 return -1; |
613 } | 578 } |
579 | |
614 throw new SFTPException(msg, code); | 580 throw new SFTPException(msg, code); |
615 } | 581 } |
616 } | 582 } |
617 else if(t == Packet.SSH_FXP_DATA) { | 583 else if (t == Packet.SSH_FXP_DATA) { |
618 // OK, collect data | 584 // OK, collect data |
619 int readLen = tr.readUINT32(); | 585 int readLen = tr.readUINT32(); |
620 | 586 |
621 if((readLen < 0) || (readLen > req.len)) { | 587 if ((readLen < 0) || (readLen > req.len)) { |
622 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet."); | 588 throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet."); |
623 } | 589 } |
624 | 590 |
625 if(log.isDebugEnabled()) { | 591 if (log.isDebugEnabled()) { |
626 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen | 592 log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen |
627 + " (requested: " + req.len + ")"); | 593 + " (requested: " + req.len + ")"); |
628 } | 594 } |
629 | 595 |
630 // Read bytes into buffer | 596 // Read bytes into buffer |
631 tr.readBytes(req.buffer, req.dstOffset, readLen); | 597 tr.readBytes(req.buffer, req.dstOffset, readLen); |
632 | 598 |
633 if(readLen < req.len) { | 599 if (readLen < req.len) { |
634 /* Send this request packet again to request the remaing data in this slot. */ | 600 /* Send this request packet again to request the remaing data in this slot. */ |
635 req.req_id = generateNextRequestID(); | 601 req.req_id = generateNextRequestID(); |
636 req.serverOffset += readLen; | 602 req.serverOffset += readLen; |
637 req.len -= readLen; | 603 req.len -= readLen; |
638 | |
639 log.debug("Requesting again: " + req.serverOffset + "/" + req.len); | 604 log.debug("Requesting again: " + req.serverOffset + "/" + req.len); |
640 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); | 605 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); |
641 | |
642 pendingReadQueue.put(req.req_id, req); | 606 pendingReadQueue.put(req.req_id, req); |
643 } | 607 } |
608 | |
644 return readLen; | 609 return readLen; |
645 } | 610 } |
646 else { | 611 else { |
647 throw new PacketTypeException(t); | 612 throw new PacketTypeException(t); |
648 } | 613 } |
649 } | 614 } |
615 | |
650 // Should never reach here. | 616 // Should never reach here. |
651 throw new SFTPException("No EOF reached", -1); | 617 throw new SFTPException("No EOF reached", -1); |
652 } | 618 } |
653 | 619 |
654 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException { | 620 private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException { |
655 TypesWriter tw = new TypesWriter(); | 621 TypesWriter tw = new TypesWriter(); |
656 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | 622 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); |
657 tw.writeUINT64(offset); | 623 tw.writeUINT64(offset); |
658 tw.writeUINT32(len); | 624 tw.writeUINT32(len); |
659 | |
660 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes()); | 625 sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes()); |
661 } | 626 } |
662 | 627 |
663 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { | 628 public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { |
664 while(len > 0) { | 629 while (len > 0) { |
665 int writeRequestLen = len; | 630 int writeRequestLen = len; |
666 | 631 |
667 if(writeRequestLen > 32768) { | 632 if (writeRequestLen > 32768) { |
668 writeRequestLen = 32768; | 633 writeRequestLen = 32768; |
669 } | 634 } |
670 | 635 |
671 // Send the next write request | 636 // Send the next write request |
672 OutstandingStatusRequest req = new OutstandingStatusRequest(); | 637 OutstandingStatusRequest req = new OutstandingStatusRequest(); |
673 req.req_id = generateNextRequestID(); | 638 req.req_id = generateNextRequestID(); |
674 | |
675 TypesWriter tw = new TypesWriter(); | 639 TypesWriter tw = new TypesWriter(); |
676 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); | 640 tw.writeString(handle.getHandle(), 0, handle.getHandle().length); |
677 tw.writeUINT64(fileOffset); | 641 tw.writeUINT64(fileOffset); |
678 tw.writeString(src, srcoff, writeRequestLen); | 642 tw.writeString(src, srcoff, writeRequestLen); |
679 | |
680 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); | 643 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); |
681 | |
682 pendingStatusQueue.put(req.req_id, req); | 644 pendingStatusQueue.put(req.req_id, req); |
683 | 645 |
684 // Only read next status if parallelism reached | 646 // Only read next status if parallelism reached |
685 while(pendingStatusQueue.size() >= parallelism) { | 647 while (pendingStatusQueue.size() >= parallelism) { |
686 this.readStatus(); | 648 this.readStatus(); |
687 } | 649 } |
650 | |
688 fileOffset += writeRequestLen; | 651 fileOffset += writeRequestLen; |
689 srcoff += writeRequestLen; | 652 srcoff += writeRequestLen; |
690 len -= writeRequestLen; | 653 len -= writeRequestLen; |
691 } | 654 } |
692 } | 655 } |