Mercurial > 510Connectbot
comparison app/src/main/java/ch/ethz/ssh2/transport/TransportManager.java @ 438:d29cce60f393
migrate from Eclipse to Android Studio
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Thu, 03 Dec 2015 11:23:55 -0800 |
parents | src/ch/ethz/ssh2/transport/TransportManager.java@126af684034e |
children |
comparison
equal
deleted
inserted
replaced
437:208b31032318 | 438:d29cce60f393 |
---|---|
1 /* | |
2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved. | |
3 * Please refer to the LICENSE.txt for licensing details. | |
4 */ | |
5 | |
6 package ch.ethz.ssh2.transport; | |
7 | |
8 import java.io.IOException; | |
9 import java.io.InterruptedIOException; | |
10 import java.net.Socket; | |
11 import java.security.KeyPair; | |
12 import java.util.ArrayList; | |
13 import java.util.List; | |
14 | |
15 import ch.ethz.ssh2.ConnectionInfo; | |
16 import ch.ethz.ssh2.ConnectionMonitor; | |
17 import ch.ethz.ssh2.DHGexParameters; | |
18 import ch.ethz.ssh2.PacketTypeException; | |
19 import ch.ethz.ssh2.compression.Compressor; | |
20 import ch.ethz.ssh2.crypto.CryptoWishList; | |
21 import ch.ethz.ssh2.crypto.cipher.BlockCipher; | |
22 import ch.ethz.ssh2.crypto.digest.MAC; | |
23 import ch.ethz.ssh2.log.Logger; | |
24 import ch.ethz.ssh2.packets.PacketDisconnect; | |
25 import ch.ethz.ssh2.packets.Packets; | |
26 import ch.ethz.ssh2.packets.TypesReader; | |
27 | |
28 /** | |
29 * Yes, the "standard" is a big mess. On one side, the say that arbitrary channel | |
30 * packets are allowed during kex exchange, on the other side we need to blindly | |
31 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that | |
32 * the next packet is not a channel data packet? Yes, we could check if it is in | |
33 * the KEX range. But the standard says nothing about this. The OpenSSH guys | |
34 * block local "normal" traffic during KEX. That's fine - however, they assume | |
35 * that the other side is doing the same. During re-key, if they receive traffic | |
36 * other than KEX, they become horribly irritated and kill the connection. Since | |
37 * we are very likely going to communicate with OpenSSH servers, we have to play | |
38 * the same game - even though we could do better. | |
39 * | |
40 * @author Christian Plattner | |
41 * @version $Id: TransportManager.java 161 2014-05-01 18:01:55Z dkocher@sudo.ch $ | |
42 */ | |
43 public abstract class TransportManager { | |
44 protected static final Logger log = Logger.getLogger(TransportManager.class); | |
45 | |
46 private static final class HandlerEntry { | |
47 MessageHandler mh; | |
48 int low; | |
49 int high; | |
50 } | |
51 | |
52 /** | |
53 * Advertised maximum SSH packet size that the other side can send to us. | |
54 */ | |
55 public static final int MAX_PACKET_SIZE = 64 * 1024; | |
56 | |
57 private final List<AsynchronousEntry> asynchronousQueue | |
58 = new ArrayList<AsynchronousEntry>(); | |
59 | |
60 private Thread asynchronousThread = null; | |
61 private boolean asynchronousPending = false; | |
62 | |
63 private Socket socket; | |
64 | |
65 protected TransportManager(final Socket socket) { | |
66 this.socket = socket; | |
67 } | |
68 | |
69 private static final class AsynchronousEntry { | |
70 public byte[] message; | |
71 | |
72 public AsynchronousEntry(byte[] message) { | |
73 this.message = message; | |
74 } | |
75 } | |
76 | |
77 private final class AsynchronousWorker implements Runnable { | |
78 public void run() { | |
79 while (true) { | |
80 final AsynchronousEntry item; | |
81 | |
82 synchronized (asynchronousQueue) { | |
83 if (asynchronousQueue.size() == 0) { | |
84 // Only now we may reset the flag, since we are sure that all queued items | |
85 // have been sent (there is a slight delay between de-queuing and sending, | |
86 // this is why we need this flag! See code below. Sending takes place outside | |
87 // of this lock, this is why a test for size()==0 (from another thread) does not ensure | |
88 // that all messages have been sent. | |
89 asynchronousPending = false; | |
90 // Notify any senders that they can proceed, all async messages have been delivered | |
91 asynchronousQueue.notifyAll(); | |
92 | |
93 // After the queue is empty for about 2 seconds, stop this thread | |
94 try { | |
95 asynchronousQueue.wait(2000); | |
96 } | |
97 catch (InterruptedException ignore) { | |
98 // | |
99 } | |
100 | |
101 if (asynchronousQueue.size() == 0) { | |
102 asynchronousThread = null; | |
103 return; | |
104 } | |
105 } | |
106 | |
107 item = asynchronousQueue.remove(0); | |
108 } | |
109 | |
110 try { | |
111 sendMessageImmediate(item.message); | |
112 } | |
113 catch (IOException e) { | |
114 // There is no point in handling it - it simply means that the connection has a problem and we should stop | |
115 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null): | |
116 // further messages in the queue cannot be sent by this or any other thread. | |
117 // Other threads will sooner or later (when receiving or sending the next message) get the | |
118 // same IOException and get to the same conclusion. | |
119 log.warning(e.getMessage()); | |
120 return; | |
121 } | |
122 } | |
123 } | |
124 } | |
125 | |
126 private final Object connectionSemaphore = new Object(); | |
127 | |
128 private boolean flagKexOngoing; | |
129 | |
130 private boolean connectionClosed; | |
131 private Throwable reasonClosedCause; | |
132 | |
133 private TransportConnection tc; | |
134 private KexManager km; | |
135 | |
136 private final List<HandlerEntry> messageHandlers = new ArrayList<HandlerEntry>(); | |
137 | |
138 private List<ConnectionMonitor> connectionMonitors = new ArrayList<ConnectionMonitor>(); | |
139 boolean monitorsWereInformed = false; | |
140 | |
141 protected void init(TransportConnection tc, KexManager km) { | |
142 this.tc = tc; | |
143 this.km = km; | |
144 } | |
145 | |
146 public int getPacketOverheadEstimate() { | |
147 return tc.getPacketOverheadEstimate(); | |
148 } | |
149 | |
150 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException { | |
151 return km.getOrWaitForConnectionInfo(kexNumber); | |
152 } | |
153 | |
154 public Throwable getReasonClosedCause() { | |
155 synchronized (connectionSemaphore) { | |
156 return reasonClosedCause; | |
157 } | |
158 } | |
159 | |
160 public byte[] getSessionIdentifier() { | |
161 return km.sessionId; | |
162 } | |
163 | |
164 public void close(Throwable cause, boolean useDisconnectPacket) { | |
165 if (useDisconnectPacket == false) { | |
166 // OK, hard shutdown - do not acquire the semaphore, | |
167 // perhaps somebody is inside (and waits until | |
168 // the remote side is ready to accept new data). | |
169 try { | |
170 socket.close(); | |
171 } | |
172 catch (IOException ignore) { | |
173 } | |
174 | |
175 // OK, whoever tried to send data, should now agree that | |
176 // there is no point in further waiting =) | |
177 // It is safe now to acquire the semaphore. | |
178 } | |
179 | |
180 synchronized (connectionSemaphore) { | |
181 if (!connectionClosed) { | |
182 if (useDisconnectPacket == true) { | |
183 try { | |
184 if (tc != null) | |
185 tc.sendMessage(new PacketDisconnect(PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload()); | |
186 } | |
187 catch (IOException ignore) { | |
188 } | |
189 | |
190 try { | |
191 socket.close(); | |
192 } | |
193 catch (IOException ignore) { | |
194 } | |
195 } | |
196 | |
197 connectionClosed = true; | |
198 reasonClosedCause = cause; | |
199 } | |
200 | |
201 connectionSemaphore.notifyAll(); | |
202 } | |
203 | |
204 // check if we need to inform the monitors | |
205 List<ConnectionMonitor> monitors = null; | |
206 | |
207 synchronized (this) { | |
208 // Short term lock to protect "connectionMonitors" | |
209 // and "monitorsWereInformed" | |
210 // (they may be modified concurrently) | |
211 if (monitorsWereInformed == false) { | |
212 monitorsWereInformed = true; | |
213 monitors = new ArrayList<ConnectionMonitor>(connectionMonitors); | |
214 } | |
215 } | |
216 | |
217 if (monitors != null) { | |
218 for (ConnectionMonitor cmon : monitors) { | |
219 try { | |
220 cmon.connectionLost(reasonClosedCause); | |
221 } | |
222 catch (Exception ignore) { | |
223 } | |
224 } | |
225 } | |
226 } | |
227 | |
228 protected void startReceiver() throws IOException { | |
229 final Thread receiveThread = new Thread(new Runnable() { | |
230 public void run() { | |
231 try { | |
232 receiveLoop(); | |
233 // Can only exit with exception | |
234 } | |
235 catch (IOException e) { | |
236 close(e, false); | |
237 log.warning(e.getMessage()); | |
238 | |
239 // Tell all handlers that it is time to say goodbye | |
240 if (km != null) { | |
241 km.handleFailure(e); | |
242 } | |
243 | |
244 for (HandlerEntry he : messageHandlers) { | |
245 he.mh.handleFailure(e); | |
246 } | |
247 } | |
248 | |
249 if (log.isDebugEnabled()) { | |
250 log.debug("Receive thread: back from receiveLoop"); | |
251 } | |
252 } | |
253 }); | |
254 receiveThread.setName("Transport Manager"); | |
255 receiveThread.setDaemon(true); | |
256 receiveThread.start(); | |
257 } | |
258 | |
259 public void registerMessageHandler(MessageHandler mh, int low, int high) { | |
260 HandlerEntry he = new HandlerEntry(); | |
261 he.mh = mh; | |
262 he.low = low; | |
263 he.high = high; | |
264 | |
265 synchronized (messageHandlers) { | |
266 messageHandlers.add(he); | |
267 } | |
268 } | |
269 | |
270 public void removeMessageHandler(MessageHandler handler) { | |
271 synchronized (messageHandlers) { | |
272 for (int i = 0; i < messageHandlers.size(); i++) { | |
273 HandlerEntry he = messageHandlers.get(i); | |
274 | |
275 if (he.mh == handler) { | |
276 messageHandlers.remove(i); | |
277 break; | |
278 } | |
279 } | |
280 } | |
281 } | |
282 | |
283 public void sendKexMessage(byte[] msg) throws IOException { | |
284 synchronized (connectionSemaphore) { | |
285 if (connectionClosed) { | |
286 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause); | |
287 } | |
288 | |
289 flagKexOngoing = true; | |
290 | |
291 try { | |
292 tc.sendMessage(msg); | |
293 } | |
294 catch (IOException e) { | |
295 close(e, false); | |
296 throw e; | |
297 } | |
298 } | |
299 } | |
300 | |
301 public void kexFinished() throws IOException { | |
302 synchronized (connectionSemaphore) { | |
303 flagKexOngoing = false; | |
304 connectionSemaphore.notifyAll(); | |
305 } | |
306 } | |
307 | |
308 /** | |
309 * @param cwl Crypto wishlist | |
310 * @param dhgex Diffie-hellman group exchange | |
311 * @param dsa may be null if this is a client connection | |
312 * @param rsa may be null if this is a client connection | |
313 * @throws IOException | |
314 */ | |
315 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec) | |
316 throws IOException { | |
317 synchronized (connectionSemaphore) { | |
318 if (connectionClosed) { | |
319 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause); | |
320 } | |
321 } | |
322 | |
323 km.initiateKEX(cwl, dhgex, dsa, rsa, ec); | |
324 } | |
325 | |
326 public void changeRecvCipher(BlockCipher bc, MAC mac) { | |
327 tc.changeRecvCipher(bc, mac); | |
328 } | |
329 | |
330 public void changeSendCipher(BlockCipher bc, MAC mac) { | |
331 tc.changeSendCipher(bc, mac); | |
332 } | |
333 | |
334 public void changeRecvCompression(Compressor comp) { | |
335 tc.changeRecvCompression(comp); | |
336 } | |
337 | |
338 public void changeSendCompression(Compressor comp) { | |
339 tc.changeSendCompression(comp); | |
340 } | |
341 | |
342 public void sendAsynchronousMessage(byte[] msg) throws IOException { | |
343 synchronized (asynchronousQueue) { | |
344 asynchronousQueue.add(new AsynchronousEntry(msg)); | |
345 asynchronousPending = true; | |
346 | |
347 /* This limit should be flexible enough. We need this, otherwise the peer | |
348 * can flood us with global requests (and other stuff where we have to reply | |
349 * with an asynchronous message) and (if the server just sends data and does not | |
350 * read what we send) this will probably put us in a low memory situation | |
351 * (our send queue would grow and grow and...) */ | |
352 | |
353 if (asynchronousQueue.size() > 100) { | |
354 throw new IOException("The peer is not consuming our asynchronous replies."); | |
355 } | |
356 | |
357 // Check if we have an asynchronous sending thread | |
358 if (asynchronousThread == null) { | |
359 asynchronousThread = new Thread(new AsynchronousWorker()); | |
360 asynchronousThread.setDaemon(true); | |
361 asynchronousThread.start(); | |
362 // The thread will stop after 2 seconds of inactivity (i.e., empty queue) | |
363 } | |
364 | |
365 asynchronousQueue.notifyAll(); | |
366 } | |
367 } | |
368 | |
369 public void setConnectionMonitors(List<ConnectionMonitor> monitors) { | |
370 synchronized (this) { | |
371 connectionMonitors = new ArrayList<ConnectionMonitor>(monitors); | |
372 } | |
373 } | |
374 | |
375 /** | |
376 * Send a message but ensure that all queued messages are being sent first. | |
377 * | |
378 * @param msg Message | |
379 * @throws IOException | |
380 */ | |
381 public void sendMessage(byte[] msg) throws IOException { | |
382 synchronized (asynchronousQueue) { | |
383 while (asynchronousPending) { | |
384 try { | |
385 asynchronousQueue.wait(); | |
386 } | |
387 catch (InterruptedException e) { | |
388 throw new InterruptedIOException(e.getMessage()); | |
389 } | |
390 } | |
391 } | |
392 | |
393 sendMessageImmediate(msg); | |
394 } | |
395 | |
396 /** | |
397 * Send message, ignore queued async messages that have not been delivered yet. | |
398 * Will be called directly from the asynchronousThread thread. | |
399 * | |
400 * @param msg Message | |
401 * @throws IOException | |
402 */ | |
403 public void sendMessageImmediate(byte[] msg) throws IOException { | |
404 synchronized (connectionSemaphore) { | |
405 while (true) { | |
406 if (connectionClosed) { | |
407 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause); | |
408 } | |
409 | |
410 if (!flagKexOngoing) { | |
411 break; | |
412 } | |
413 | |
414 try { | |
415 connectionSemaphore.wait(); | |
416 } | |
417 catch (InterruptedException e) { | |
418 throw new InterruptedIOException(e.getMessage()); | |
419 } | |
420 } | |
421 | |
422 try { | |
423 tc.sendMessage(msg); | |
424 } | |
425 catch (IOException e) { | |
426 close(e, false); | |
427 throw e; | |
428 } | |
429 } | |
430 } | |
431 | |
432 private void receiveLoop() throws IOException { | |
433 while (true) { | |
434 final byte[] buffer = new byte[MAX_PACKET_SIZE]; | |
435 final int length = tc.receiveMessage(buffer, 0, buffer.length); | |
436 final byte[] packet = new byte[length]; | |
437 System.arraycopy(buffer, 0, packet, 0, length); | |
438 final int type = packet[0] & 0xff; | |
439 log.debug(String.format("transport manager receive loop type %d", type)); | |
440 | |
441 switch (type) { | |
442 case Packets.SSH_MSG_IGNORE: | |
443 break; | |
444 | |
445 case Packets.SSH_MSG_DEBUG: { | |
446 TypesReader tr = new TypesReader(packet); | |
447 tr.readByte(); | |
448 // always_display | |
449 tr.readBoolean(); | |
450 String message = tr.readString(); | |
451 | |
452 if (log.isDebugEnabled()) { | |
453 log.debug(String.format("Debug message from remote: '%s'", message)); | |
454 } | |
455 | |
456 break; | |
457 } | |
458 | |
459 case Packets.SSH_MSG_UNIMPLEMENTED: | |
460 throw new PacketTypeException(type); | |
461 | |
462 case Packets.SSH_MSG_DISCONNECT: { | |
463 final PacketDisconnect disconnect = new PacketDisconnect(packet); | |
464 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage()); | |
465 } | |
466 | |
467 case Packets.SSH_MSG_KEXINIT: | |
468 case Packets.SSH_MSG_NEWKEYS: | |
469 case Packets.SSH_MSG_KEXDH_INIT: | |
470 case Packets.SSH_MSG_KEXDH_REPLY: | |
471 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST: | |
472 case Packets.SSH_MSG_KEX_DH_GEX_INIT: | |
473 case Packets.SSH_MSG_KEX_DH_GEX_REPLY: | |
474 // Is it a KEX Packet | |
475 km.handleMessage(packet); | |
476 break; | |
477 | |
478 case Packets.SSH_MSG_USERAUTH_SUCCESS: | |
479 tc.startCompression(); | |
480 | |
481 // Continue with message handlers | |
482 default: | |
483 boolean handled = false; | |
484 | |
485 for (HandlerEntry handler : messageHandlers) { | |
486 if ((handler.low <= type) && (type <= handler.high)) { | |
487 handler.mh.handleMessage(packet); | |
488 handled = true; | |
489 break; | |
490 } | |
491 } | |
492 | |
493 if (!handled) { | |
494 throw new PacketTypeException(type); | |
495 } | |
496 | |
497 break; | |
498 } | |
499 | |
500 if (log.isDebugEnabled()) { | |
501 log.debug(String.format("Handled packet %d", type)); | |
502 } | |
503 } | |
504 } | |
505 } |