diff src/ch/ethz/ssh2/AbstractSFTPClient.java @ 342:175c7d68f3c4

merge ganymed into mainline
author Carl Byington <carl@five-ten-sg.com>
date Thu, 31 Jul 2014 16:33:38 -0700
parents 071eccdff8ea
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/ch/ethz/ssh2/AbstractSFTPClient.java	Thu Jul 31 16:33:38 2014 -0700
@@ -0,0 +1,690 @@
+package ch.ethz.ssh2;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import ch.ethz.ssh2.channel.Channel;
+import ch.ethz.ssh2.log.Logger;
+import ch.ethz.ssh2.packets.TypesReader;
+import ch.ethz.ssh2.packets.TypesWriter;
+import ch.ethz.ssh2.sftp.AttribFlags;
+import ch.ethz.ssh2.sftp.ErrorCodes;
+import ch.ethz.ssh2.sftp.Packet;
+import ch.ethz.ssh2.util.StringEncoder;
+
+/**
+ * @version $Id: AbstractSFTPClient.java 151 2014-04-28 10:03:39Z dkocher@sudo.ch $
+ */
+public abstract class AbstractSFTPClient implements SFTPClient {
+
+    private static final Logger log = Logger.getLogger(SFTPv3Client.class);
+
+    private Session sess;
+
+    private InputStream is;
+    private OutputStream os;
+
+    private int next_request_id = 1000;
+
+    private String charset;
+
+    /**
+     * Parallel read requests maximum size.
+     */
+    private static final int DEFAULT_MAX_PARALLELISM = 64;
+
+    /**
+     * Parallel read requests.
+     */
+    private int parallelism = DEFAULT_MAX_PARALLELISM;
+
+    public void setRequestParallelism(int parallelism) {
+        this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM);
+    }
+
+    /**
+     * Mapping request ID to request.
+     */
+    private Map<Integer, OutstandingReadRequest> pendingReadQueue
+        = new HashMap<Integer, OutstandingReadRequest>();
+
+    /**
+     * Mapping request ID to request.
+     */
+    private Map<Integer, OutstandingStatusRequest> pendingStatusQueue
+        = new HashMap<Integer, OutstandingStatusRequest>();
+
+    private PacketListener listener;
+
+    protected AbstractSFTPClient(final Connection conn, final int version, final PacketListener listener) throws IOException {
+        this.listener = listener;
+        log.debug("Opening session and starting SFTP subsystem.");
+        sess = conn.openSession();
+        sess.startSubSystem("sftp");
+        is = sess.getStdout();
+        os = new BufferedOutputStream(sess.getStdin(), 2048);
+        init(version);
+    }
+
+    private void init(final int client_version) throws IOException {
+        // Send SSH_FXP_INIT with client version
+        TypesWriter tw = new TypesWriter();
+        tw.writeUINT32(client_version);
+        sendMessage(Packet.SSH_FXP_INIT, 0, tw.getBytes());
+        /* Receive SSH_FXP_VERSION */
+        log.debug("Waiting for SSH_FXP_VERSION...");
+        TypesReader tr = new TypesReader(receiveMessage(34000)); /* Should be enough for any reasonable server */
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+
+        if (t != Packet.SSH_FXP_VERSION) {
+            log.warning(String.format("The server did not send a SSH_FXP_VERSION but %d", t));
+            throw new PacketTypeException(t);
+        }
+
+        final int protocol_version = tr.readUINT32();
+        log.debug("SSH_FXP_VERSION: protocol_version = " + protocol_version);
+
+        if (protocol_version != client_version) {
+            throw new IOException(String.format("Server protocol version %d does not match %d",
+                                                protocol_version, client_version));
+        }
+
+        // Both parties should from then on adhere to particular version of the protocol
+
+        // Read and save extensions (if any) for later use
+        while (tr.remain() != 0) {
+            String name = tr.readString();
+            listener.read(name);
+            byte[] value = tr.readByteString();
+            log.debug(String.format("SSH_FXP_VERSION: extension: %s = '%s'", name, StringEncoder.GetString(value)));
+        }
+    }
+
+    /**
+     * Queries the channel state
+     *
+     * @return True if the underlying session is in open state
+     */
+    public boolean isConnected() {
+        return sess.getState() == Channel.STATE_OPEN;
+    }
+
+    /**
+     * Close this SFTP session. NEVER forget to call this method to free up
+     * resources - even if you got an exception from one of the other methods.
+     * Sometimes these other methods may throw an exception, saying that the
+     * underlying channel is closed (this can happen, e.g., if the other server
+     * sent a close message.) However, as long as you have not called the
+     * <code>close()</code> method, you are likely wasting resources.
+     */
+    public void close() {
+        sess.close();
+    }
+
+    /**
+     * Set the charset used to convert between Java Unicode Strings and byte encodings
+     * used by the server for paths and file names.
+     *
+     * @param charset the name of the charset to be used or <code>null</code> to use UTF-8.
+     * @throws java.io.IOException
+     * @see #getCharset()
+     */
+    public void setCharset(String charset) throws IOException {
+        if (charset == null) {
+            this.charset = null;
+            return;
+        }
+
+        try {
+            Charset.forName(charset);
+        }
+        catch (UnsupportedCharsetException e) {
+            throw new IOException("This charset is not supported", e);
+        }
+
+        this.charset = charset;
+    }
+
+    /**
+     * The currently used charset for filename encoding/decoding.
+     *
+     * @return The name of the charset (<code>null</code> if UTF-8 is used).
+     * @see #setCharset(String)
+     */
+    public String getCharset() {
+        return charset;
+    }
+
+    public abstract SFTPFileHandle openFile(String fileName, int flags, SFTPFileAttributes attr) throws IOException;
+
+    public void mkdir(String dirName, int posixPermissions) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(dirName, this.getCharset());
+        tw.writeUINT32(AttribFlags.SSH_FILEXFER_ATTR_PERMISSIONS);
+        tw.writeUINT32(posixPermissions);
+        sendMessage(Packet.SSH_FXP_MKDIR, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void rm(String fileName) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(fileName, this.getCharset());
+        sendMessage(Packet.SSH_FXP_REMOVE, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void rmdir(String dirName) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(dirName, this.getCharset());
+        sendMessage(Packet.SSH_FXP_RMDIR, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void mv(String oldPath, String newPath) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(oldPath, this.getCharset());
+        tw.writeString(newPath, this.getCharset());
+        sendMessage(Packet.SSH_FXP_RENAME, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public String readLink(String path) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(path, charset);
+        sendMessage(Packet.SSH_FXP_READLINK, req_id, tw.getBytes());
+        byte[] resp = receiveMessage(34000);
+        TypesReader tr = new TypesReader(resp);
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
+
+        if (rep_id != req_id) {
+            throw new RequestMismatchException();
+        }
+
+        if (t == Packet.SSH_FXP_NAME) {
+            int count = tr.readUINT32();
+
+            if (count != 1) {
+                throw new PacketTypeException(t);
+            }
+
+            return tr.readString(charset);
+        }
+
+        if (t != Packet.SSH_FXP_STATUS) {
+            throw new PacketTypeException(t);
+        }
+
+        int errorCode = tr.readUINT32();
+        String errorMessage = tr.readString();
+        listener.read(errorMessage);
+        throw new SFTPException(errorMessage, errorCode);
+    }
+
+    public void setstat(String path, SFTPFileAttributes attr) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(path, charset);
+        tw.writeBytes(attr.toBytes());
+        sendMessage(Packet.SSH_FXP_SETSTAT, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void fsetstat(SFTPFileHandle handle, SFTPFileAttributes attr) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
+        tw.writeBytes(attr.toBytes());
+        sendMessage(Packet.SSH_FXP_FSETSTAT, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void createSymlink(String src, String target) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(src, charset);
+        tw.writeString(target, charset);
+        sendMessage(Packet.SSH_FXP_SYMLINK, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public void createHardlink(String src, String target) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString("hardlink@openssh.com", charset);
+        tw.writeString(target, charset);
+        tw.writeString(src, charset);
+        sendMessage(Packet.SSH_FXP_EXTENDED, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    public String canonicalPath(String path) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(path, charset);
+        sendMessage(Packet.SSH_FXP_REALPATH, req_id, tw.getBytes());
+        byte[] resp = receiveMessage(34000);
+        TypesReader tr = new TypesReader(resp);
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
+
+        if (rep_id != req_id) {
+            throw new RequestMismatchException();
+        }
+
+        if (t == Packet.SSH_FXP_NAME) {
+            int count = tr.readUINT32();
+
+            if (count != 1) {
+                throw new PacketFormatException("The server sent an invalid SSH_FXP_NAME packet.");
+            }
+
+            final String name = tr.readString(charset);
+            listener.read(name);
+            return name;
+        }
+
+        if (t != Packet.SSH_FXP_STATUS) {
+            throw new PacketTypeException(t);
+        }
+
+        int errorCode = tr.readUINT32();
+        String errorMessage = tr.readString();
+        listener.read(errorMessage);
+        throw new SFTPException(errorMessage, errorCode);
+    }
+
+    private void sendMessage(int type, int requestId, byte[] msg, int off, int len) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Send message of type %d with request id %d", type, requestId));
+        }
+
+        listener.write(Packet.forName(type));
+        int msglen = len + 1;
+
+        if (type != Packet.SSH_FXP_INIT) {
+            msglen += 4;
+        }
+
+        os.write(msglen >> 24);
+        os.write(msglen >> 16);
+        os.write(msglen >> 8);
+        os.write(msglen);
+        os.write(type);
+
+        if (type != Packet.SSH_FXP_INIT) {
+            os.write(requestId >> 24);
+            os.write(requestId >> 16);
+            os.write(requestId >> 8);
+            os.write(requestId);
+        }
+
+        os.write(msg, off, len);
+        os.flush();
+    }
+
+    protected void sendMessage(int type, int requestId, byte[] msg) throws IOException {
+        sendMessage(type, requestId, msg, 0, msg.length);
+    }
+
+    private void readBytes(byte[] buff, int pos, int len) throws IOException {
+        while (len > 0) {
+            int count = is.read(buff, pos, len);
+
+            if (count < 0) {
+                throw new SocketException("Unexpected end of stream.");
+            }
+
+            len -= count;
+            pos += count;
+        }
+    }
+
+    /**
+     * Read a message and guarantee that the <b>contents</b> is not larger than
+     * <code>maxlen</code> bytes.
+     * <p/>
+     * Note: receiveMessage(34000) actually means that the message may be up to 34004
+     * bytes (the length attribute preceding the contents is 4 bytes).
+     *
+     * @param maxlen
+     * @return the message contents
+     * @throws IOException
+     */
+    protected byte[] receiveMessage(int maxlen) throws IOException {
+        byte[] msglen = new byte[4];
+        readBytes(msglen, 0, 4);
+        int len = (((msglen[0] & 0xff) << 24) | ((msglen[1] & 0xff) << 16) | ((msglen[2] & 0xff) << 8) | (msglen[3] & 0xff));
+
+        if ((len > maxlen) || (len <= 0)) {
+            throw new PacketFormatException(String.format("Illegal SFTP packet length %d", len));
+        }
+
+        byte[] msg = new byte[len];
+        readBytes(msg, 0, len);
+        return msg;
+    }
+
+    protected int generateNextRequestID() {
+        synchronized (this) {
+            return next_request_id++;
+        }
+    }
+
+    protected void closeHandle(byte[] handle) throws IOException {
+        int req_id = generateNextRequestID();
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(handle, 0, handle.length);
+        sendMessage(Packet.SSH_FXP_CLOSE, req_id, tw.getBytes());
+        expectStatusOKMessage(req_id);
+    }
+
+    private void readStatus() throws IOException {
+        byte[] resp = receiveMessage(34000);
+        TypesReader tr = new TypesReader(resp);
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+        // Search the pending queue
+        OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
+
+        if (null == status) {
+            throw new RequestMismatchException();
+        }
+
+        // Evaluate the answer
+        if (t == Packet.SSH_FXP_STATUS) {
+            // In any case, stop sending more packets
+            int code = tr.readUINT32();
+
+            if (log.isDebugEnabled()) {
+                String[] desc = ErrorCodes.getDescription(code);
+                log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
+            }
+
+            if (code == ErrorCodes.SSH_FX_OK) {
+                return;
+            }
+
+            String msg = tr.readString();
+            listener.read(msg);
+            throw new SFTPException(msg, code);
+        }
+
+        throw new PacketTypeException(t);
+    }
+
+    private void readPendingReadStatus() throws IOException {
+        byte[] resp = receiveMessage(34000);
+        TypesReader tr = new TypesReader(resp);
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+        // Search the pending queue
+        OutstandingReadRequest status = pendingReadQueue.remove(tr.readUINT32());
+
+        if (null == status) {
+            throw new RequestMismatchException();
+        }
+
+        // Evaluate the answer
+        if (t == Packet.SSH_FXP_STATUS) {
+            // In any case, stop sending more packets
+            int code = tr.readUINT32();
+
+            if (log.isDebugEnabled()) {
+                String[] desc = ErrorCodes.getDescription(code);
+                log.debug("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
+            }
+
+            if (code == ErrorCodes.SSH_FX_OK) {
+                return;
+            }
+
+            if (code == ErrorCodes.SSH_FX_EOF) {
+                return;
+            }
+
+            String msg = tr.readString();
+            listener.read(msg);
+            throw new SFTPException(msg, code);
+        }
+
+        throw new PacketTypeException(t);
+    }
+
+    protected void expectStatusOKMessage(int id) throws IOException {
+        byte[] resp = receiveMessage(34000);
+        TypesReader tr = new TypesReader(resp);
+        int t = tr.readByte();
+        listener.read(Packet.forName(t));
+        int rep_id = tr.readUINT32();
+
+        if (rep_id != id) {
+            throw new RequestMismatchException();
+        }
+
+        if (t != Packet.SSH_FXP_STATUS) {
+            throw new PacketTypeException(t);
+        }
+
+        int errorCode = tr.readUINT32();
+
+        if (errorCode == ErrorCodes.SSH_FX_OK) {
+            return;
+        }
+
+        String errorMessage = tr.readString();
+        listener.read(errorMessage);
+        throw new SFTPException(errorMessage, errorCode);
+    }
+
+    public void closeFile(SFTPFileHandle handle) throws IOException {
+        while (!pendingReadQueue.isEmpty()) {
+            this.readPendingReadStatus();
+        }
+
+        while (!pendingStatusQueue.isEmpty()) {
+            this.readStatus();
+        }
+
+        closeHandle(handle.getHandle());
+    }
+
+    public int read(SFTPFileHandle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException {
+        boolean errorOccured = false;
+        int remaining = len * parallelism;
+        //int clientOffset = dstoff;
+        long serverOffset = fileOffset;
+
+        for (OutstandingReadRequest r : pendingReadQueue.values()) {
+            // Server offset should take pending requests into account.
+            serverOffset += r.len;
+        }
+
+        while (true) {
+            // Stop if there was an error and no outstanding request
+            if ((pendingReadQueue.size() == 0) && errorOccured) {
+                break;
+            }
+
+            // Send as many requests as we are allowed to
+            while (pendingReadQueue.size() < parallelism) {
+                if (errorOccured) {
+                    break;
+                }
+
+                // Send the next read request
+                OutstandingReadRequest req = new OutstandingReadRequest();
+                req.req_id = generateNextRequestID();
+                req.serverOffset = serverOffset;
+                req.len = (remaining > len) ? len : remaining;
+                req.buffer = dst;
+                req.dstOffset = dstoff;
+                serverOffset += req.len;
+                //clientOffset += req.len;
+                remaining -= req.len;
+                sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
+                pendingReadQueue.put(req.req_id, req);
+            }
+
+            if (pendingReadQueue.size() == 0) {
+                break;
+            }
+
+            // Receive a single answer
+            byte[] resp = receiveMessage(34000);
+            TypesReader tr = new TypesReader(resp);
+            int t = tr.readByte();
+            listener.read(Packet.forName(t));
+            // Search the pending queue
+            OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
+
+            if (null == req) {
+                throw new RequestMismatchException();
+            }
+
+            // Evaluate the answer
+            if (t == Packet.SSH_FXP_STATUS) {
+                /* In any case, stop sending more packets */
+                int code = tr.readUINT32();
+                String msg = tr.readString();
+                listener.read(msg);
+
+                if (log.isDebugEnabled()) {
+                    String[] desc = ErrorCodes.getDescription(code);
+                    log.debug("Got SSH_FXP_STATUS (" + req.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
+                }
+
+                // Flag to read all pending requests but don't send any more.
+                errorOccured = true;
+
+                if (pendingReadQueue.isEmpty()) {
+                    if (ErrorCodes.SSH_FX_EOF == code) {
+                        return -1;
+                    }
+
+                    throw new SFTPException(msg, code);
+                }
+            }
+            else if (t == Packet.SSH_FXP_DATA) {
+                // OK, collect data
+                int readLen = tr.readUINT32();
+
+                if ((readLen < 0) || (readLen > req.len)) {
+                    throw new PacketFormatException("The server sent an invalid length field in a SSH_FXP_DATA packet.");
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Got SSH_FXP_DATA (" + req.req_id + ") " + req.serverOffset + "/" + readLen
+                              + " (requested: " + req.len + ")");
+                }
+
+                // Read bytes into buffer
+                tr.readBytes(req.buffer, req.dstOffset, readLen);
+
+                if (readLen < req.len) {
+                    /* Send this request packet again to request the remaing data in this slot. */
+                    req.req_id = generateNextRequestID();
+                    req.serverOffset += readLen;
+                    req.len -= readLen;
+                    log.debug("Requesting again: " + req.serverOffset + "/" + req.len);
+                    sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
+                    pendingReadQueue.put(req.req_id, req);
+                }
+
+                return readLen;
+            }
+            else {
+                throw new PacketTypeException(t);
+            }
+        }
+
+        // Should never reach here.
+        throw new SFTPException("No EOF reached", -1);
+    }
+
+    private void sendReadRequest(int id, SFTPFileHandle handle, long offset, int len) throws IOException {
+        TypesWriter tw = new TypesWriter();
+        tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
+        tw.writeUINT64(offset);
+        tw.writeUINT32(len);
+        sendMessage(Packet.SSH_FXP_READ, id, tw.getBytes());
+    }
+
+    public void write(SFTPFileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
+        while (len > 0) {
+            int writeRequestLen = len;
+
+            if (writeRequestLen > 32768) {
+                writeRequestLen = 32768;
+            }
+
+            // Send the next write request
+            OutstandingStatusRequest req = new OutstandingStatusRequest();
+            req.req_id = generateNextRequestID();
+            TypesWriter tw = new TypesWriter();
+            tw.writeString(handle.getHandle(), 0, handle.getHandle().length);
+            tw.writeUINT64(fileOffset);
+            tw.writeString(src, srcoff, writeRequestLen);
+            sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
+            pendingStatusQueue.put(req.req_id, req);
+
+            // Only read next status if parallelism reached
+            while (pendingStatusQueue.size() >= parallelism) {
+                this.readStatus();
+            }
+
+            fileOffset += writeRequestLen;
+            srcoff += writeRequestLen;
+            len -= writeRequestLen;
+        }
+    }
+
+
+    /**
+     * A read  is divided into multiple requests sent sequentially before
+     * reading any status from the server
+     */
+    private static class OutstandingReadRequest {
+        int req_id;
+        /**
+         * Read offset to request on server starting at the file offset for the first request.
+         */
+        long serverOffset;
+        /**
+         * Length of requested data
+         */
+        int len;
+        /**
+         * Offset in destination buffer
+         */
+        int dstOffset;
+        /**
+         * Temporary buffer
+         */
+        byte[] buffer;
+    }
+
+    /**
+     * A read  is divided into multiple requests sent sequentially before
+     * reading any status from the server
+     */
+    private static final class OutstandingStatusRequest {
+        int req_id;
+    }
+
+}