diff src/ch/ethz/ssh2/AbstractSFTPClient.java @ 308:42b15aaa7ac7 ganymed

merge
author Carl Byington <carl@five-ten-sg.com>
date Wed, 30 Jul 2014 14:21:50 -0700
parents 071eccdff8ea
children
line wrap: on
line diff
--- a/src/ch/ethz/ssh2/AbstractSFTPClient.java	Wed Jul 30 13:38:04 2014 -0700
+++ b/src/ch/ethz/ssh2/AbstractSFTPClient.java	Wed Jul 30 14:21:50 2014 -0700
@@ -53,61 +53,54 @@
      * Mapping request ID to request.
      */
     private Map<Integer, OutstandingReadRequest> pendingReadQueue
-            = new HashMap<Integer, OutstandingReadRequest>();
+        = new HashMap<Integer, OutstandingReadRequest>();
 
     /**
      * Mapping request ID to request.
      */
     private Map<Integer, OutstandingStatusRequest> pendingStatusQueue
-            = new HashMap<Integer, OutstandingStatusRequest>();
+        = new HashMap<Integer, OutstandingStatusRequest>();
 
     private PacketListener listener;
 
     protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException {
         this.listener = listener;
-
         log.debug("Opening session and starting SFTP subsystem.");
         sess = conn.openSession();
         sess.startSubSystem("sftp");
-
         is = sess.getStdout();
         os = new BufferedOutputStream(sess.getStdin(), 2048);
-
         init(version);
-
     }
 
     private void init(final int client_version) throws IOException {
         // Send SSH_FXP_INIT with client version
-
         TypesWriter tw = new TypesWriter();
         tw.writeUINT32(client_version);
         sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes());
-
-		/* Receive SSH_FXP_VERSION */
-
+        /* Receive SSH_FXP_VERSION */
         log.debug("Waiting for SSH_FXP_VERSION...");
         TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */
-
         int t = tr.readByte();
         listener.read(Packet.forName(t));
 
-        if(t != Packet.SSH_FXP_VERSION) {
+        if (t != Packet.SSH_FXP_VERSION) {
             log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t));
             throw new PacketTypeException(t);
         }
 
         final int protocol_version = tr.readUINT32();
+        log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
 
-        log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
-        if(protocol_version != client_version) {
+        if (protocol_version != client_version) {
             throw new IOException(String.format("Server protocol version %d does not match %d",
-                    protocol_version, client_version));
+                                                protocol_version, client_version));
         }
+
         // Both parties should from then on adhere to particular version of the protocol
 
         // Read and save extensions (if any) for later use
-        while(tr.remain() != 0) {
+        while (tr.remain() != 0) {
             String name = tr.readString();
             listener.read(name);
             byte[] value = tr.readByteString();
@@ -145,16 +138,18 @@
      * @see #getCharset()
      */
     public void setCharset(String charset) throws IOException {
-        if(charset == null) {
+        if (charset == null) {
             this.charset = null;
             return;
         }
+
         try {
             Charset.forName(charset);
         }
-        catch(UnsupportedCharsetException e) {
+        catch (UnsupportedCharsetException e) {
             throw new IOException("This charset is not supported", e);
         }
+
         this.charset = charset;
     }
 
@@ -172,82 +167,65 @@
 
     public void mkdir(String dirName, int posixPermissions) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(dirName, this.getCharset());
         tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS);
         tw.writeUINT32(posixPermissions);
-
         sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void rm(String fileName) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(fileName, this.getCharset());
-
         sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void rmdir(String dirName) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(dirName, this.getCharset());
-
         sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void mv(String oldPath, String newPath) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(oldPath, this.getCharset());
         tw.writeString(newPath, this.getCharset());
-
         sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public String readLink(String path) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(path, charset);
-
         sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes());
-
         byte[] resp = receiveMessage(34000);
-
         TypesReader tr = new TypesReader(resp);
-
         int t = tr.readByte();
         listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
 
-        int rep_id = tr.readUINT32();
-        if(rep_id != req_id) {
+        if (rep_id != req_id) {
             throw new RequestMismatchException();
         }
 
-        if(t == Packet.SSH_FXP_NAME) {
+        if (t == Packet.SSH_FXP_NAME) {
             int count = tr.readUINT32();
 
-            if(count != 1) {
+            if (count != 1) {
                 throw new PacketTypeException(t);
             }
 
             return tr.readString(charset);
         }
 
-        if(t != Packet.SSH_FXP_STATUS) {
+        if (t != Packet.SSH_FXP_STATUS) {
             throw new PacketTypeException(t);
         }
 
@@ -259,85 +237,69 @@
 
     public void setstat(String path, SFTPFileAttributes attr) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(path, charset);
         tw.writeBytes(attr.toBytes());
-
         sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
         tw.writeBytes(attr.toBytes());
-
         sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void createSymlink(String src, String target) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(src, charset);
         tw.writeString(target, charset);
-
         sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public void createHardlink(String src, String target) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString("hardlink@openssh.com", charset);
         tw.writeString(target, charset);
         tw.writeString(src, charset);
-
         sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     public String canonicalPath(String path) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(path, charset);
-
         sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes());
-
         byte[] resp = receiveMessage(34000);
-
         TypesReader tr = new TypesReader(resp);
-
         int t = tr.readByte();
         listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
 
-        int rep_id = tr.readUINT32();
-        if(rep_id != req_id) {
+        if (rep_id != req_id) {
             throw new RequestMismatchException();
         }
 
-        if(t == Packet.SSH_FXP_NAME) {
+        if (t == Packet.SSH_FXP_NAME) {
             int count = tr.readUINT32();
 
-            if(count != 1) {
+            if (count != 1) {
                 throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet.");
             }
+
             final String name = tr.readString(charset);
             listener.read(name);
             return name;
         }
 
-        if(t != Packet.SSH_FXP_STATUS) {
+        if (t != Packet.SSH_FXP_STATUS) {
             throw new PacketTypeException(t);
         }
 
@@ -348,14 +310,14 @@
     }
 
     private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException {
-        if(log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug(String.format("Send message of type %d with request id %d", type, requestId));
         }
+
         listener.write(Packet.forName(type));
-
         int msglen = len + 1;
 
-        if(type != Packet.SSH_FXP_INIT) {
+        if (type != Packet.SSH_FXP_INIT) {
             msglen += 4;
         }
 
@@ -365,7 +327,7 @@
         os.write(msglen);
         os.write(type);
 
-        if(type != Packet.SSH_FXP_INIT) {
+        if (type != Packet.SSH_FXP_INIT) {
             os.write(requestId >> 24);
             os.write(requestId >> 16);
             os.write(requestId >> 8);
@@ -381,11 +343,13 @@
     }
 
     private void readBytes(byte[] buff, int pos, int len) throws IOException {
-        while(len > 0) {
+        while (len > 0) {
             int count = is.read(buff, pos, len);
-            if(count < 0) {
+
+            if (count < 0) {
                 throw new SocketException("Unexpected end of stream.");
             }
+
             len -= count;
             pos += count;
         }
@@ -404,138 +368,139 @@
      */
     protected byte[] receiveMessage(int maxlen) throws IOException {
         byte[] msglen = new byte[4];
-
         readBytes(msglen, 0, 4);
-
         int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff));
 
-        if((len > maxlen) || (len <= 0)) {
+        if ((len > maxlen) || (len <= 0)) {
             throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len));
         }
 
         byte[] msg = new byte[len];
-
         readBytes(msg, 0, len);
-
         return msg;
     }
 
     protected int generateNextRequestID() {
-        synchronized(this) {
+        synchronized (this) {
             return next_request_id++;
         }
     }
 
     protected void closeHandle(byte[] handle) throws IOException {
         int req_id = generateNextRequestID();
-
         TypesWriter tw = new TypesWriter();
         tw.writeString(handle, 0, handle.length);
-
         sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes());
-
         expectStatusOKMessage(req_id);
     }
 
     private void readStatus() throws IOException {
         byte[] resp = receiveMessage(34000);
-
         TypesReader tr = new TypesReader(resp);
         int t = tr.readByte();
         listener.read(Packet.forName(t));
-
         // Search the pending queue
         OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
-        if(null == status) {
+
+        if (null == status) {
             throw new RequestMismatchException();
         }
 
         // Evaluate the answer
-        if(t == Packet.SSH_FXP_STATUS) {
+        if (t == Packet.SSH_FXP_STATUS) {
             // In any case, stop sending more packets
             int code = tr.readUINT32();
-            if(log.isDebugEnabled()) {
+
+            if (log.isDebugEnabled()) {
                 String[] desc = ErrorCodes.getDescription(code);
                 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
             }
-            if(code == ErrorCodes.SSH_FX_OK) {
+
+            if (code == ErrorCodes.SSH_FX_OK) {
                 return;
             }
+
             String msg = tr.readString();
             listener.read(msg);
             throw new SFTPException(msg, code);
         }
+
         throw new PacketTypeException(t);
     }
 
     private void readPendingReadStatus() throws IOException {
         byte[] resp = receiveMessage(34000);
-
         TypesReader tr = new TypesReader(resp);
         int t = tr.readByte();
         listener.read(Packet.forName(t));
-
         // Search the pending queue
         OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32());
-        if(null == status) {
+
+        if (null == status) {
             throw new RequestMismatchException();
         }
 
         // Evaluate the answer
-        if(t == Packet.SSH_FXP_STATUS) {
+        if (t == Packet.SSH_FXP_STATUS) {
             // In any case, stop sending more packets
             int code = tr.readUINT32();
-            if(log.isDebugEnabled()) {
+
+            if (log.isDebugEnabled()) {
                 String[] desc = ErrorCodes.getDescription(code);
                 log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
             }
-            if(code == ErrorCodes.SSH_FX_OK) {
+
+            if (code == ErrorCodes.SSH_FX_OK) {
                 return;
             }
-            if(code == ErrorCodes.SSH_FX_EOF) {
+
+            if (code == ErrorCodes.SSH_FX_EOF) {
                 return;
             }
+
             String msg = tr.readString();
             listener.read(msg);
             throw new SFTPException(msg, code);
         }
+
         throw new PacketTypeException(t);
     }
 
     protected void expectStatusOKMessage(int id) throws IOException {
         byte[] resp = receiveMessage(34000);
-
         TypesReader tr = new TypesReader(resp);
-
         int t = tr.readByte();
         listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
 
-        int rep_id = tr.readUINT32();
-        if(rep_id != id) {
+        if (rep_id != id) {
             throw new RequestMismatchException();
         }
 
-        if(t != Packet.SSH_FXP_STATUS) {
+        if (t != Packet.SSH_FXP_STATUS) {
             throw new PacketTypeException(t);
         }
 
         int errorCode = tr.readUINT32();
 
-        if(errorCode == ErrorCodes.SSH_FX_OK) {
+        if (errorCode == ErrorCodes.SSH_FX_OK) {
             return;
         }
+
         String errorMessage = tr.readString();
         listener.read(errorMessage);
         throw new SFTPException(errorMessage, errorCode);
     }
 
     public void closeFile(SFTPFileHandle handle) throws IOException {
-        while(!pendingReadQueue.isEmpty()) {
+        while (!pendingReadQueue.isEmpty()) {
             this.readPendingReadStatus();
         }
-        while(!pendingStatusQueue.isEmpty()) {
+
+        while (!pendingStatusQueue.isEmpty()) {
             this.readStatus();
         }
+
         closeHandle(handle.getHandle());
     }
 
@@ -543,24 +508,25 @@
         boolean errorOccured = false;
         int remaining = len * parallelism;
         //int clientOffset = dstoff;
+        long serverOffset = fileOffset;
 
-        long serverOffset = fileOffset;
-        for(OutstandingReadRequest r : pendingReadQueue.values()) {
+        for (OutstandingReadRequest r : pendingReadQueue.values()) {
             // Server offset should take pending requests into account.
             serverOffset += r.len;
         }
 
-        while(true) {
+        while (true) {
             // Stop if there was an error and no outstanding request
-            if((pendingReadQueue.size() == 0) && errorOccured) {
+            if ((pendingReadQueue.size() == 0) && errorOccured) {
                 break;
             }
 
             // Send as many requests as we are allowed to
-            while(pendingReadQueue.size() < parallelism) {
-                if(errorOccured) {
+            while (pendingReadQueue.size() < parallelism) {
+                if (errorOccured) {
                     break;
                 }
+
                 // Send the next read request
                 OutstandingReadRequest req = new OutstandingReadRequest();
                 req.req_id = generateNextRequestID();
@@ -568,85 +534,85 @@
                 req.len = (remaining > len) ? len : remaining;
                 req.buffer = dst;
                 req.dstOffset = dstoff;
-
                 serverOffset += req.len;
                 //clientOffset += req.len;
                 remaining -= req.len;
-
                 sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
-
                 pendingReadQueue.put(req.req_id, req);
             }
-            if(pendingReadQueue.size() == 0) {
+
+            if (pendingReadQueue.size() == 0) {
                 break;
             }
 
             // Receive a single answer
             byte[] resp = receiveMessage(34000);
             TypesReader tr = new TypesReader(resp);
-
             int t = tr.readByte();
             listener.read(Packet.forName(t));
-
             // Search the pending queue
             OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
-            if(null == req) {
+
+            if (null == req) {
                 throw new RequestMismatchException();
             }
+
             // Evaluate the answer
-            if(t == Packet.SSH_FXP_STATUS) {
+            if (t == Packet.SSH_FXP_STATUS) {
                 /* In any case, stop sending more packets */
-
                 int code = tr.readUINT32();
                 String msg = tr.readString();
                 listener.read(msg);
 
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     String[] desc = ErrorCodes.getDescription(code);
                     log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
                 }
+
                 // Flag to read all pending requests but don't send any more.
                 errorOccured = true;
-                if(pendingReadQueue.isEmpty()) {
-                    if(ErrorCodes.SSH_FX_EOF == code) {
+
+                if (pendingReadQueue.isEmpty()) {
+                    if (ErrorCodes.SSH_FX_EOF == code) {
                         return -1;
                     }
+
                     throw new SFTPException(msg, code);
                 }
             }
-            else if(t == Packet.SSH_FXP_DATA) {
+            else if (t == Packet.SSH_FXP_DATA) {
                 // OK, collect data
                 int readLen = tr.readUINT32();
 
-                if((readLen < 0) || (readLen > req.len)) {
+                if ((readLen < 0) || (readLen > req.len)) {
                     throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet.");
                 }
 
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen
-                            + " (requested: " + req.len + ")");
+                              + " (requested: " + req.len + ")");
                 }
 
                 // Read bytes into buffer
                 tr.readBytes(req.buffer, req.dstOffset, readLen);
 
-                if(readLen < req.len) {
+                if (readLen < req.len) {
                     /* Send this request packet again to request the remaing data in this slot. */
                     req.req_id = generateNextRequestID();
                     req.serverOffset += readLen;
                     req.len -= readLen;
-
                     log.debug("Requesting again: " + req.serverOffset + "/" + req.len);
                     sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
-
                     pendingReadQueue.put(req.req_id, req);
                 }
+
                 return readLen;
             }
             else {
                 throw new PacketTypeException(t);
             }
         }
+
         // Should never reach here.
         throw new SFTPException("No EOF reached", -1);
     }
@@ -656,35 +622,32 @@
         tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
         tw.writeUINT64(offset);
         tw.writeUINT32(len);
-
         sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes());
     }
 
     public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
-        while(len > 0) {
+        while (len > 0) {
             int writeRequestLen = len;
 
-            if(writeRequestLen > 32768) {
+            if (writeRequestLen > 32768) {
                 writeRequestLen = 32768;
             }
 
             // Send the next write request
             OutstandingStatusRequest req = new OutstandingStatusRequest();
             req.req_id = generateNextRequestID();
-
             TypesWriter tw = new TypesWriter();
             tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
             tw.writeUINT64(fileOffset);
             tw.writeString(src, srcoff, writeRequestLen);
-
             sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
-
             pendingStatusQueue.put(req.req_id, req);
 
             // Only read next status if parallelism reached
-            while(pendingStatusQueue.size() >= parallelism) {
+            while (pendingStatusQueue.size() >= parallelism) {
                 this.readStatus();
             }
+
             fileOffset += writeRequestLen;
             srcoff += writeRequestLen;
             len -= writeRequestLen;