Mercurial > 510Connectbot
diff src/ch/ethz/ssh2/transport/TransportManager.java @ 307:071eccdff8ea ganymed
fix java formatting
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Wed, 30 Jul 2014 14:16:58 -0700 |
parents | d2b303406d63 |
children | 776a220dbcc6 |
line wrap: on
line diff
--- a/src/ch/ethz/ssh2/transport/TransportManager.java Wed Jul 30 12:09:51 2014 -0700 +++ b/src/ch/ethz/ssh2/transport/TransportManager.java Wed Jul 30 14:16:58 2014 -0700 @@ -55,7 +55,7 @@ public static final int MAX_PACKET_SIZE = 64 * 1024; private final List<AsynchronousEntry> asynchronousQueue - = new ArrayList<AsynchronousEntry>(); + = new ArrayList<AsynchronousEntry>(); private Thread asynchronousThread = null; private boolean asynchronousPending = false; @@ -76,40 +76,41 @@ private final class AsynchronousWorker implements Runnable { public void run() { - while(true) { + while (true) { final AsynchronousEntry item; - synchronized(asynchronousQueue) { - if(asynchronousQueue.size() == 0) { + + synchronized (asynchronousQueue) { + if (asynchronousQueue.size() == 0) { // Only now we may reset the flag, since we are sure that all queued items // have been sent (there is a slight delay between de-queuing and sending, // this is why we need this flag! See code below. Sending takes place outside // of this lock, this is why a test for size()==0 (from another thread) does not ensure // that all messages have been sent. - asynchronousPending = false; - // Notify any senders that they can proceed, all async messages have been delivered - asynchronousQueue.notifyAll(); // After the queue is empty for about 2 seconds, stop this thread try { asynchronousQueue.wait(2000); } - catch(InterruptedException ignore) { + catch (InterruptedException ignore) { // } - if(asynchronousQueue.size() == 0) { + + if (asynchronousQueue.size() == 0) { asynchronousThread = null; return; } } + item = asynchronousQueue.remove(0); } + try { sendMessageImmediate(item.message); } - catch(IOException e) { + catch (IOException e) { // There is no point in handling it - it simply means that the connection has a problem and we should stop // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null): // further messages in the queue cannot be sent by this or any other thread. @@ -133,10 +134,10 @@ private KexManager km; private final List<HandlerEntry> messageHandlers - = new ArrayList<HandlerEntry>(); + = new ArrayList<HandlerEntry>(); private List<ConnectionMonitor> connectionMonitors - = new ArrayList<ConnectionMonitor>(); + = new ArrayList<ConnectionMonitor>(); protected void init(TransportConnection tc, KexManager km) { this.tc = tc; @@ -152,7 +153,7 @@ } public IOException getReasonClosedCause() { - synchronized(connectionSemaphore) { + synchronized (connectionSemaphore) { return reasonClosedCause; } } @@ -163,28 +164,32 @@ public void close() { // It is safe now to acquire the semaphore. - synchronized(connectionSemaphore) { - if(!connectionClosed) { + synchronized (connectionSemaphore) { + if (!connectionClosed) { try { tc.sendMessage(new PacketDisconnect( - PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload()); + PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload()); } - catch(IOException ignore) { + catch (IOException ignore) { // } + try { socket.close(); } - catch(IOException ignore) { + catch (IOException ignore) { // } + connectionClosed = true; - synchronized(this) { - for(ConnectionMonitor cmon : connectionMonitors) { + + synchronized (this) { + for (ConnectionMonitor cmon : connectionMonitors) { cmon.connectionLost(reasonClosedCause); } } } + connectionSemaphore.notifyAll(); } } @@ -195,16 +200,18 @@ try { socket.close(); } - catch(IOException ignore) { + catch (IOException ignore) { } + // It is safe now to acquire the semaphore. - synchronized(connectionSemaphore) { + synchronized (connectionSemaphore) { connectionClosed = true; reasonClosedCause = cause; connectionSemaphore.notifyAll(); } - synchronized(this) { - for(ConnectionMonitor cmon : connectionMonitors) { + + synchronized (this) { + for (ConnectionMonitor cmon : connectionMonitors) { cmon.connectionLost(reasonClosedCause); } } @@ -217,18 +224,21 @@ receiveLoop(); // Can only exit with exception } - catch(IOException e) { + catch (IOException e) { close(e); log.warning(e.getMessage()); + // Tell all handlers that it is time to say goodbye - if(km != null) { + if (km != null) { km.handleFailure(e); } - for(HandlerEntry he : messageHandlers) { + + for (HandlerEntry he : messageHandlers) { he.mh.handleFailure(e); } } - if(log.isDebugEnabled()) { + + if (log.isDebugEnabled()) { log.debug("Receive thread: back from receiveLoop"); } } @@ -244,16 +254,17 @@ he.low = low; he.high = high; - synchronized(messageHandlers) { + synchronized (messageHandlers) { messageHandlers.add(he); } } public void removeMessageHandler(MessageHandler handler) { - synchronized(messageHandlers) { - for(int i = 0; i < messageHandlers.size(); i++) { + synchronized (messageHandlers) { + for (int i = 0; i < messageHandlers.size(); i++) { HandlerEntry he = messageHandlers.get(i); - if(he.mh == handler) { + + if (he.mh == handler) { messageHandlers.remove(i); break; } @@ -262,15 +273,17 @@ } public void sendKexMessage(byte[] msg) throws IOException { - synchronized(connectionSemaphore) { - if(connectionClosed) { + synchronized (connectionSemaphore) { + if (connectionClosed) { throw reasonClosedCause; } + flagKexOngoing = true; + try { tc.sendMessage(msg); } - catch(IOException e) { + catch (IOException e) { close(e); throw e; } @@ -278,7 +291,7 @@ } public void kexFinished() throws IOException { - synchronized(connectionSemaphore) { + synchronized (connectionSemaphore) { flagKexOngoing = false; connectionSemaphore.notifyAll(); } @@ -292,13 +305,14 @@ * @throws IOException */ public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec) - throws IOException { - synchronized(connectionSemaphore) { - if(connectionClosed) { + throws IOException { + synchronized (connectionSemaphore) { + if (connectionClosed) { // Inform the caller that there is no point in triggering a new kex throw reasonClosedCause; } } + km.initiateKEX(cwl, dhgex, dsa, rsa, ec); } @@ -319,33 +333,34 @@ } public void sendAsynchronousMessage(byte[] msg) throws IOException { - synchronized(asynchronousQueue) { + synchronized (asynchronousQueue) { asynchronousQueue.add(new AsynchronousEntry(msg)); asynchronousPending = true; - /* This limit should be flexible enough. We need this, otherwise the peer + /* This limit should be flexible enough. We need this, otherwise the peer * can flood us with global requests (and other stuff where we have to reply - * with an asynchronous message) and (if the server just sends data and does not - * read what we send) this will probably put us in a low memory situation - * (our send queue would grow and grow and...) */ + * with an asynchronous message) and (if the server just sends data and does not + * read what we send) this will probably put us in a low memory situation + * (our send queue would grow and grow and...) */ - if(asynchronousQueue.size() > 100) { + if (asynchronousQueue.size() > 100) { throw new IOException("The peer is not consuming our asynchronous replies."); } // Check if we have an asynchronous sending thread - if(asynchronousThread == null) { + if (asynchronousThread == null) { asynchronousThread = new Thread(new AsynchronousWorker()); asynchronousThread.setDaemon(true); asynchronousThread.start(); // The thread will stop after 2 seconds of inactivity (i.e., empty queue) } + asynchronousQueue.notifyAll(); } } public void setConnectionMonitors(List<ConnectionMonitor> monitors) { - synchronized(this) { + synchronized (this) { connectionMonitors = new ArrayList<ConnectionMonitor>(); connectionMonitors.addAll(monitors); } @@ -358,16 +373,17 @@ * @throws IOException */ public void sendMessage(byte[] msg) throws IOException { - synchronized(asynchronousQueue) { - while(asynchronousPending) { + synchronized (asynchronousQueue) { + while (asynchronousPending) { try { asynchronousQueue.wait(); } - catch(InterruptedException e) { + catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } } } + sendMessageImmediate(msg); } @@ -379,18 +395,20 @@ * @throws IOException */ public void sendMessageImmediate(byte[] msg) throws IOException { - synchronized(connectionSemaphore) { - while(true) { - if(connectionClosed) { + synchronized (connectionSemaphore) { + while (true) { + if (connectionClosed) { throw reasonClosedCause; } - if(!flagKexOngoing) { + + if (!flagKexOngoing) { break; } + try { connectionSemaphore.wait(); } - catch(InterruptedException e) { + catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } } @@ -398,7 +416,7 @@ try { tc.sendMessage(msg); } - catch(IOException e) { + catch (IOException e) { close(e); throw e; } @@ -406,32 +424,39 @@ } private void receiveLoop() throws IOException { - while(true) { + while (true) { final byte[] buffer = new byte[MAX_PACKET_SIZE]; final int length = tc.receiveMessage(buffer, 0, buffer.length); final byte[] packet = new byte[length]; System.arraycopy(buffer, 0, packet, 0, length); final int type = packet[0] & 0xff; - switch(type) { + + switch (type) { case Packets.SSH_MSG_IGNORE: break; + case Packets.SSH_MSG_DEBUG: { - TypesReader tr = new TypesReader(packet); - tr.readByte(); - // always_display - tr.readBoolean(); - String message = tr.readString(); - if(log.isDebugEnabled()) { - log.debug(String.format("Debug message from remote: '%s'", message)); + TypesReader tr = new TypesReader(packet); + tr.readByte(); + // always_display + tr.readBoolean(); + String message = tr.readString(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Debug message from remote: '%s'", message)); + } + + break; } - break; - } + case Packets.SSH_MSG_UNIMPLEMENTED: throw new PacketTypeException(type); + case Packets.SSH_MSG_DISCONNECT: { - final PacketDisconnect disconnect = new PacketDisconnect(packet); - throw new DisconnectException(disconnect.getReason(), disconnect.getMessage()); - } + final PacketDisconnect disconnect = new PacketDisconnect(packet); + throw new DisconnectException(disconnect.getReason(), disconnect.getMessage()); + } + case Packets.SSH_MSG_KEXINIT: case Packets.SSH_MSG_NEWKEYS: case Packets.SSH_MSG_KEXDH_INIT: @@ -442,24 +467,30 @@ // Is it a KEX Packet km.handleMessage(packet); break; + case Packets.SSH_MSG_USERAUTH_SUCCESS: tc.startCompression(); - // Continue with message handlers + + // Continue with message handlers default: boolean handled = false; - for(HandlerEntry handler : messageHandlers) { - if((handler.low <= type) && (type <= handler.high)) { + + for (HandlerEntry handler : messageHandlers) { + if ((handler.low <= type) && (type <= handler.high)) { handler.mh.handleMessage(packet); handled = true; break; } } - if(!handled) { + + if (!handled) { throw new PacketTypeException(type); } + break; } - if(log.isDebugEnabled()) { + + if (log.isDebugEnabled()) { log.debug(String.format("Handled packet %d", type)); } }