Mercurial > 510Connectbot
comparison src/ch/ethz/ssh2/transport/TransportManager.java @ 273:91a31873c42a ganymed
start conversion from trilead to ganymed
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Fri, 18 Jul 2014 11:21:46 -0700 |
parents | |
children | d7e088fa2123 |
comparison
equal
deleted
inserted
replaced
272:ce2f4e397703 | 273:91a31873c42a |
---|---|
1 /* | |
2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved. | |
3 * Please refer to the LICENSE.txt for licensing details. | |
4 */ | |
5 | |
6 package ch.ethz.ssh2.transport; | |
7 | |
8 import java.io.IOException; | |
9 import java.io.InterruptedIOException; | |
10 import java.net.Socket; | |
11 import java.util.ArrayList; | |
12 import java.util.List; | |
13 | |
14 import ch.ethz.ssh2.ConnectionInfo; | |
15 import ch.ethz.ssh2.ConnectionMonitor; | |
16 import ch.ethz.ssh2.DHGexParameters; | |
17 import ch.ethz.ssh2.PacketTypeException; | |
18 import ch.ethz.ssh2.compression.Compressor; | |
19 import ch.ethz.ssh2.crypto.CryptoWishList; | |
20 import ch.ethz.ssh2.crypto.cipher.BlockCipher; | |
21 import ch.ethz.ssh2.crypto.digest.MAC; | |
22 import ch.ethz.ssh2.log.Logger; | |
23 import ch.ethz.ssh2.packets.PacketDisconnect; | |
24 import ch.ethz.ssh2.packets.Packets; | |
25 import ch.ethz.ssh2.packets.TypesReader; | |
26 import ch.ethz.ssh2.signature.DSAPrivateKey; | |
27 import ch.ethz.ssh2.signature.RSAPrivateKey; | |
28 | |
29 /** | |
30 * Yes, the "standard" is a big mess. On one side, the say that arbitrary channel | |
31 * packets are allowed during kex exchange, on the other side we need to blindly | |
32 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that | |
33 * the next packet is not a channel data packet? Yes, we could check if it is in | |
34 * the KEX range. But the standard says nothing about this. The OpenSSH guys | |
35 * block local "normal" traffic during KEX. That's fine - however, they assume | |
36 * that the other side is doing the same. During re-key, if they receive traffic | |
37 * other than KEX, they become horribly irritated and kill the connection. Since | |
38 * we are very likely going to communicate with OpenSSH servers, we have to play | |
39 * the same game - even though we could do better. | |
40 * | |
41 * @author Christian Plattner | |
42 * @version $Id: TransportManager.java 161 2014-05-01 18:01:55Z dkocher@sudo.ch $ | |
43 */ | |
44 public abstract class TransportManager { | |
45 private static final Logger log = Logger.getLogger(TransportManager.class); | |
46 | |
47 private static final class HandlerEntry { | |
48 MessageHandler mh; | |
49 int low; | |
50 int high; | |
51 } | |
52 | |
53 /** | |
54 * Advertised maximum SSH packet size that the other side can send to us. | |
55 */ | |
56 public static final int MAX_PACKET_SIZE = 64 * 1024; | |
57 | |
58 private final List<AsynchronousEntry> asynchronousQueue | |
59 = new ArrayList<AsynchronousEntry>(); | |
60 | |
61 private Thread asynchronousThread = null; | |
62 private boolean asynchronousPending = false; | |
63 | |
64 private Socket socket; | |
65 | |
66 protected TransportManager(final Socket socket) { | |
67 this.socket = socket; | |
68 } | |
69 | |
70 private static final class AsynchronousEntry { | |
71 public byte[] message; | |
72 | |
73 public AsynchronousEntry(byte[] message) { | |
74 this.message = message; | |
75 } | |
76 } | |
77 | |
78 private final class AsynchronousWorker implements Runnable { | |
79 @Override | |
80 public void run() { | |
81 while(true) { | |
82 final AsynchronousEntry item; | |
83 synchronized(asynchronousQueue) { | |
84 if(asynchronousQueue.size() == 0) { | |
85 // Only now we may reset the flag, since we are sure that all queued items | |
86 // have been sent (there is a slight delay between de-queuing and sending, | |
87 // this is why we need this flag! See code below. Sending takes place outside | |
88 // of this lock, this is why a test for size()==0 (from another thread) does not ensure | |
89 // that all messages have been sent. | |
90 | |
91 asynchronousPending = false; | |
92 | |
93 // Notify any senders that they can proceed, all async messages have been delivered | |
94 | |
95 asynchronousQueue.notifyAll(); | |
96 | |
97 // After the queue is empty for about 2 seconds, stop this thread | |
98 try { | |
99 asynchronousQueue.wait(2000); | |
100 } | |
101 catch(InterruptedException ignore) { | |
102 // | |
103 } | |
104 if(asynchronousQueue.size() == 0) { | |
105 asynchronousThread = null; | |
106 return; | |
107 } | |
108 } | |
109 item = asynchronousQueue.remove(0); | |
110 } | |
111 try { | |
112 sendMessageImmediate(item.message); | |
113 } | |
114 catch(IOException e) { | |
115 // There is no point in handling it - it simply means that the connection has a problem and we should stop | |
116 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null): | |
117 // further messages in the queue cannot be sent by this or any other thread. | |
118 // Other threads will sooner or later (when receiving or sending the next message) get the | |
119 // same IOException and get to the same conclusion. | |
120 log.warning(e.getMessage()); | |
121 return; | |
122 } | |
123 } | |
124 } | |
125 } | |
126 | |
127 private final Object connectionSemaphore = new Object(); | |
128 | |
129 private boolean flagKexOngoing; | |
130 | |
131 private boolean connectionClosed; | |
132 private IOException reasonClosedCause; | |
133 | |
134 private TransportConnection tc; | |
135 private KexManager km; | |
136 | |
137 private final List<HandlerEntry> messageHandlers | |
138 = new ArrayList<HandlerEntry>(); | |
139 | |
140 private List<ConnectionMonitor> connectionMonitors | |
141 = new ArrayList<ConnectionMonitor>(); | |
142 | |
143 protected void init(TransportConnection tc, KexManager km) { | |
144 this.tc = tc; | |
145 this.km = km; | |
146 } | |
147 | |
148 public int getPacketOverheadEstimate() { | |
149 return tc.getPacketOverheadEstimate(); | |
150 } | |
151 | |
152 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException { | |
153 return km.getOrWaitForConnectionInfo(kexNumber); | |
154 } | |
155 | |
156 public IOException getReasonClosedCause() { | |
157 synchronized(connectionSemaphore) { | |
158 return reasonClosedCause; | |
159 } | |
160 } | |
161 | |
162 public byte[] getSessionIdentifier() { | |
163 return km.sessionId; | |
164 } | |
165 | |
166 public void close() { | |
167 // It is safe now to acquire the semaphore. | |
168 synchronized(connectionSemaphore) { | |
169 if(!connectionClosed) { | |
170 try { | |
171 tc.sendMessage(new PacketDisconnect( | |
172 PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload()); | |
173 } | |
174 catch(IOException ignore) { | |
175 // | |
176 } | |
177 try { | |
178 socket.close(); | |
179 } | |
180 catch(IOException ignore) { | |
181 // | |
182 } | |
183 connectionClosed = true; | |
184 synchronized(this) { | |
185 for(ConnectionMonitor cmon : connectionMonitors) { | |
186 cmon.connectionLost(reasonClosedCause); | |
187 } | |
188 } | |
189 } | |
190 connectionSemaphore.notifyAll(); | |
191 } | |
192 } | |
193 | |
194 public void close(IOException cause) { | |
195 // Do not acquire the semaphore, perhaps somebody is inside (and waits until | |
196 // the remote side is ready to accept new data | |
197 try { | |
198 socket.close(); | |
199 } | |
200 catch(IOException ignore) { | |
201 } | |
202 // It is safe now to acquire the semaphore. | |
203 synchronized(connectionSemaphore) { | |
204 connectionClosed = true; | |
205 reasonClosedCause = cause; | |
206 connectionSemaphore.notifyAll(); | |
207 } | |
208 synchronized(this) { | |
209 for(ConnectionMonitor cmon : connectionMonitors) { | |
210 cmon.connectionLost(reasonClosedCause); | |
211 } | |
212 } | |
213 } | |
214 | |
215 protected void startReceiver() throws IOException { | |
216 final Thread receiveThread = new Thread(new Runnable() { | |
217 public void run() { | |
218 try { | |
219 receiveLoop(); | |
220 // Can only exit with exception | |
221 } | |
222 catch(IOException e) { | |
223 close(e); | |
224 log.warning(e.getMessage()); | |
225 // Tell all handlers that it is time to say goodbye | |
226 if(km != null) { | |
227 km.handleFailure(e); | |
228 } | |
229 for(HandlerEntry he : messageHandlers) { | |
230 he.mh.handleFailure(e); | |
231 } | |
232 } | |
233 if(log.isDebugEnabled()) { | |
234 log.debug("Receive thread: back from receiveLoop"); | |
235 } | |
236 } | |
237 }); | |
238 receiveThread.setName("Transport Manager"); | |
239 receiveThread.setDaemon(true); | |
240 receiveThread.start(); | |
241 } | |
242 | |
243 public void registerMessageHandler(MessageHandler mh, int low, int high) { | |
244 HandlerEntry he = new HandlerEntry(); | |
245 he.mh = mh; | |
246 he.low = low; | |
247 he.high = high; | |
248 | |
249 synchronized(messageHandlers) { | |
250 messageHandlers.add(he); | |
251 } | |
252 } | |
253 | |
254 public void removeMessageHandler(MessageHandler handler) { | |
255 synchronized(messageHandlers) { | |
256 for(int i = 0; i < messageHandlers.size(); i++) { | |
257 HandlerEntry he = messageHandlers.get(i); | |
258 if(he.mh == handler) { | |
259 messageHandlers.remove(i); | |
260 break; | |
261 } | |
262 } | |
263 } | |
264 } | |
265 | |
266 public void sendKexMessage(byte[] msg) throws IOException { | |
267 synchronized(connectionSemaphore) { | |
268 if(connectionClosed) { | |
269 throw reasonClosedCause; | |
270 } | |
271 flagKexOngoing = true; | |
272 try { | |
273 tc.sendMessage(msg); | |
274 } | |
275 catch(IOException e) { | |
276 close(e); | |
277 throw e; | |
278 } | |
279 } | |
280 } | |
281 | |
282 public void kexFinished() throws IOException { | |
283 synchronized(connectionSemaphore) { | |
284 flagKexOngoing = false; | |
285 connectionSemaphore.notifyAll(); | |
286 } | |
287 } | |
288 | |
289 /** | |
290 * @param cwl Crypto wishlist | |
291 * @param dhgex Diffie-hellman group exchange | |
292 * @param dsa may be null if this is a client connection | |
293 * @param rsa may be null if this is a client connection | |
294 * @throws IOException | |
295 */ | |
296 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, DSAPrivateKey dsa, RSAPrivateKey rsa) | |
297 throws IOException { | |
298 synchronized(connectionSemaphore) { | |
299 if(connectionClosed) { | |
300 // Inform the caller that there is no point in triggering a new kex | |
301 throw reasonClosedCause; | |
302 } | |
303 } | |
304 km.initiateKEX(cwl, dhgex, dsa, rsa); | |
305 } | |
306 | |
307 public void changeRecvCipher(BlockCipher bc, MAC mac) { | |
308 tc.changeRecvCipher(bc, mac); | |
309 } | |
310 | |
311 public void changeSendCipher(BlockCipher bc, MAC mac) { | |
312 tc.changeSendCipher(bc, mac); | |
313 } | |
314 | |
315 public void changeRecvCompression(Compressor comp) { | |
316 tc.changeRecvCompression(comp); | |
317 } | |
318 | |
319 public void changeSendCompression(Compressor comp) { | |
320 tc.changeSendCompression(comp); | |
321 } | |
322 | |
323 public void sendAsynchronousMessage(byte[] msg) throws IOException { | |
324 synchronized(asynchronousQueue) { | |
325 asynchronousQueue.add(new AsynchronousEntry(msg)); | |
326 asynchronousPending = true; | |
327 | |
328 /* This limit should be flexible enough. We need this, otherwise the peer | |
329 * can flood us with global requests (and other stuff where we have to reply | |
330 * with an asynchronous message) and (if the server just sends data and does not | |
331 * read what we send) this will probably put us in a low memory situation | |
332 * (our send queue would grow and grow and...) */ | |
333 | |
334 if(asynchronousQueue.size() > 100) { | |
335 throw new IOException("The peer is not consuming our asynchronous replies."); | |
336 } | |
337 | |
338 // Check if we have an asynchronous sending thread | |
339 if(asynchronousThread == null) { | |
340 asynchronousThread = new Thread(new AsynchronousWorker()); | |
341 asynchronousThread.setDaemon(true); | |
342 asynchronousThread.start(); | |
343 // The thread will stop after 2 seconds of inactivity (i.e., empty queue) | |
344 } | |
345 asynchronousQueue.notifyAll(); | |
346 } | |
347 } | |
348 | |
349 public void setConnectionMonitors(List<ConnectionMonitor> monitors) { | |
350 synchronized(this) { | |
351 connectionMonitors = new ArrayList<ConnectionMonitor>(); | |
352 connectionMonitors.addAll(monitors); | |
353 } | |
354 } | |
355 | |
356 /** | |
357 * Send a message but ensure that all queued messages are being sent first. | |
358 * | |
359 * @param msg Message | |
360 * @throws IOException | |
361 */ | |
362 public void sendMessage(byte[] msg) throws IOException { | |
363 synchronized(asynchronousQueue) { | |
364 while(asynchronousPending) { | |
365 try { | |
366 asynchronousQueue.wait(); | |
367 } | |
368 catch(InterruptedException e) { | |
369 throw new InterruptedIOException(e.getMessage()); | |
370 } | |
371 } | |
372 } | |
373 sendMessageImmediate(msg); | |
374 } | |
375 | |
376 /** | |
377 * Send message, ignore queued async messages that have not been delivered yet. | |
378 * Will be called directly from the asynchronousThread thread. | |
379 * | |
380 * @param msg Message | |
381 * @throws IOException | |
382 */ | |
383 public void sendMessageImmediate(byte[] msg) throws IOException { | |
384 synchronized(connectionSemaphore) { | |
385 while(true) { | |
386 if(connectionClosed) { | |
387 throw reasonClosedCause; | |
388 } | |
389 if(!flagKexOngoing) { | |
390 break; | |
391 } | |
392 try { | |
393 connectionSemaphore.wait(); | |
394 } | |
395 catch(InterruptedException e) { | |
396 throw new InterruptedIOException(e.getMessage()); | |
397 } | |
398 } | |
399 | |
400 try { | |
401 tc.sendMessage(msg); | |
402 } | |
403 catch(IOException e) { | |
404 close(e); | |
405 throw e; | |
406 } | |
407 } | |
408 } | |
409 | |
410 private void receiveLoop() throws IOException { | |
411 while(true) { | |
412 final byte[] buffer = new byte[MAX_PACKET_SIZE]; | |
413 final int length = tc.receiveMessage(buffer, 0, buffer.length); | |
414 final byte[] packet = new byte[length]; | |
415 System.arraycopy(buffer, 0, packet, 0, length); | |
416 final int type = packet[0] & 0xff; | |
417 switch(type) { | |
418 case Packets.SSH_MSG_IGNORE: | |
419 break; | |
420 case Packets.SSH_MSG_DEBUG: { | |
421 TypesReader tr = new TypesReader(packet); | |
422 tr.readByte(); | |
423 // always_display | |
424 tr.readBoolean(); | |
425 String message = tr.readString(); | |
426 if(log.isDebugEnabled()) { | |
427 log.debug(String.format("Debug message from remote: '%s'", message)); | |
428 } | |
429 break; | |
430 } | |
431 case Packets.SSH_MSG_UNIMPLEMENTED: | |
432 throw new PacketTypeException(type); | |
433 case Packets.SSH_MSG_DISCONNECT: { | |
434 final PacketDisconnect disconnect = new PacketDisconnect(packet); | |
435 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage()); | |
436 } | |
437 case Packets.SSH_MSG_KEXINIT: | |
438 case Packets.SSH_MSG_NEWKEYS: | |
439 case Packets.SSH_MSG_KEXDH_INIT: | |
440 case Packets.SSH_MSG_KEXDH_REPLY: | |
441 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST: | |
442 case Packets.SSH_MSG_KEX_DH_GEX_INIT: | |
443 case Packets.SSH_MSG_KEX_DH_GEX_REPLY: | |
444 // Is it a KEX Packet | |
445 km.handleMessage(packet); | |
446 break; | |
447 case Packets.SSH_MSG_USERAUTH_SUCCESS: | |
448 tc.startCompression(); | |
449 // Continue with message handlers | |
450 default: | |
451 boolean handled = false; | |
452 for(HandlerEntry handler : messageHandlers) { | |
453 if((handler.low <= type) && (type <= handler.high)) { | |
454 handler.mh.handleMessage(packet); | |
455 handled = true; | |
456 break; | |
457 } | |
458 } | |
459 if(!handled) { | |
460 throw new PacketTypeException(type); | |
461 } | |
462 break; | |
463 } | |
464 if(log.isDebugEnabled()) { | |
465 log.debug(String.format("Handled packet %d", type)); | |
466 } | |
467 } | |
468 } | |
469 } |