273
|
1 /*
|
|
2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved.
|
|
3 * Please refer to the LICENSE.txt for licensing details.
|
|
4 */
|
|
5
|
|
6 package ch.ethz.ssh2.transport;
|
|
7
|
|
8 import java.io.IOException;
|
|
9 import java.io.InterruptedIOException;
|
305
d2b303406d63
remove extra override annotations that generate eclipse compiler errors
Carl Byington <carl@five-ten-sg.com>
diff
changeset
|
10 import java.net.Socket;
|
280
|
11 import java.security.KeyPair;
|
273
|
12 import java.util.ArrayList;
|
|
13 import java.util.List;
|
|
14
|
|
15 import ch.ethz.ssh2.ConnectionInfo;
|
|
16 import ch.ethz.ssh2.ConnectionMonitor;
|
|
17 import ch.ethz.ssh2.DHGexParameters;
|
|
18 import ch.ethz.ssh2.PacketTypeException;
|
|
19 import ch.ethz.ssh2.compression.Compressor;
|
|
20 import ch.ethz.ssh2.crypto.CryptoWishList;
|
|
21 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
|
|
22 import ch.ethz.ssh2.crypto.digest.MAC;
|
|
23 import ch.ethz.ssh2.log.Logger;
|
|
24 import ch.ethz.ssh2.packets.PacketDisconnect;
|
|
25 import ch.ethz.ssh2.packets.Packets;
|
|
26 import ch.ethz.ssh2.packets.TypesReader;
|
|
27
|
|
28 /**
|
|
29 * Yes, the "standard" is a big mess. On one side, the say that arbitrary channel
|
|
30 * packets are allowed during kex exchange, on the other side we need to blindly
|
|
31 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
|
|
32 * the next packet is not a channel data packet? Yes, we could check if it is in
|
|
33 * the KEX range. But the standard says nothing about this. The OpenSSH guys
|
|
34 * block local "normal" traffic during KEX. That's fine - however, they assume
|
|
35 * that the other side is doing the same. During re-key, if they receive traffic
|
|
36 * other than KEX, they become horribly irritated and kill the connection. Since
|
|
37 * we are very likely going to communicate with OpenSSH servers, we have to play
|
|
38 * the same game - even though we could do better.
|
|
39 *
|
|
40 * @author Christian Plattner
|
|
41 * @version $Id: TransportManager.java 161 2014-05-01 18:01:55Z dkocher@sudo.ch $
|
|
42 */
|
|
43 public abstract class TransportManager {
|
338
|
44 protected static final Logger log = Logger.getLogger(TransportManager.class);
|
273
|
45
|
|
46 private static final class HandlerEntry {
|
|
47 MessageHandler mh;
|
|
48 int low;
|
|
49 int high;
|
|
50 }
|
|
51
|
|
52 /**
|
|
53 * Advertised maximum SSH packet size that the other side can send to us.
|
|
54 */
|
|
55 public static final int MAX_PACKET_SIZE = 64 * 1024;
|
|
56
|
|
57 private final List<AsynchronousEntry> asynchronousQueue
|
307
|
58 = new ArrayList<AsynchronousEntry>();
|
273
|
59
|
|
60 private Thread asynchronousThread = null;
|
|
61 private boolean asynchronousPending = false;
|
|
62
|
|
63 private Socket socket;
|
|
64
|
|
65 protected TransportManager(final Socket socket) {
|
|
66 this.socket = socket;
|
|
67 }
|
|
68
|
|
69 private static final class AsynchronousEntry {
|
|
70 public byte[] message;
|
|
71
|
|
72 public AsynchronousEntry(byte[] message) {
|
|
73 this.message = message;
|
|
74 }
|
|
75 }
|
|
76
|
|
77 private final class AsynchronousWorker implements Runnable {
|
|
78 public void run() {
|
307
|
79 while (true) {
|
273
|
80 final AsynchronousEntry item;
|
307
|
81
|
|
82 synchronized (asynchronousQueue) {
|
|
83 if (asynchronousQueue.size() == 0) {
|
273
|
84 // Only now we may reset the flag, since we are sure that all queued items
|
|
85 // have been sent (there is a slight delay between de-queuing and sending,
|
|
86 // this is why we need this flag! See code below. Sending takes place outside
|
|
87 // of this lock, this is why a test for size()==0 (from another thread) does not ensure
|
|
88 // that all messages have been sent.
|
|
89 asynchronousPending = false;
|
|
90 // Notify any senders that they can proceed, all async messages have been delivered
|
|
91 asynchronousQueue.notifyAll();
|
|
92
|
|
93 // After the queue is empty for about 2 seconds, stop this thread
|
|
94 try {
|
|
95 asynchronousQueue.wait(2000);
|
|
96 }
|
307
|
97 catch (InterruptedException ignore) {
|
273
|
98 //
|
|
99 }
|
307
|
100
|
|
101 if (asynchronousQueue.size() == 0) {
|
273
|
102 asynchronousThread = null;
|
|
103 return;
|
|
104 }
|
|
105 }
|
307
|
106
|
273
|
107 item = asynchronousQueue.remove(0);
|
|
108 }
|
307
|
109
|
273
|
110 try {
|
|
111 sendMessageImmediate(item.message);
|
|
112 }
|
307
|
113 catch (IOException e) {
|
273
|
114 // There is no point in handling it - it simply means that the connection has a problem and we should stop
|
|
115 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null):
|
|
116 // further messages in the queue cannot be sent by this or any other thread.
|
|
117 // Other threads will sooner or later (when receiving or sending the next message) get the
|
|
118 // same IOException and get to the same conclusion.
|
|
119 log.warning(e.getMessage());
|
|
120 return;
|
|
121 }
|
|
122 }
|
|
123 }
|
|
124 }
|
|
125
|
|
126 private final Object connectionSemaphore = new Object();
|
|
127
|
|
128 private boolean flagKexOngoing;
|
|
129
|
320
|
130 private boolean connectionClosed;
|
|
131 private Throwable reasonClosedCause;
|
273
|
132
|
|
133 private TransportConnection tc;
|
|
134 private KexManager km;
|
|
135
|
322
|
136 private final List<HandlerEntry> messageHandlers = new ArrayList<HandlerEntry>();
|
273
|
137
|
322
|
138 private List<ConnectionMonitor> connectionMonitors = new ArrayList<ConnectionMonitor>();
|
|
139 boolean monitorsWereInformed = false;
|
273
|
140
|
|
141 protected void init(TransportConnection tc, KexManager km) {
|
|
142 this.tc = tc;
|
|
143 this.km = km;
|
|
144 }
|
|
145
|
|
146 public int getPacketOverheadEstimate() {
|
|
147 return tc.getPacketOverheadEstimate();
|
|
148 }
|
|
149
|
|
150 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException {
|
|
151 return km.getOrWaitForConnectionInfo(kexNumber);
|
|
152 }
|
|
153
|
322
|
154 public Throwable getReasonClosedCause() {
|
307
|
155 synchronized (connectionSemaphore) {
|
273
|
156 return reasonClosedCause;
|
|
157 }
|
|
158 }
|
|
159
|
|
160 public byte[] getSessionIdentifier() {
|
|
161 return km.sessionId;
|
|
162 }
|
|
163
|
319
|
164 public void close(Throwable cause, boolean useDisconnectPacket) {
|
|
165 if (useDisconnectPacket == false) {
|
|
166 // OK, hard shutdown - do not acquire the semaphore,
|
|
167 // perhaps somebody is inside (and waits until
|
|
168 // the remote side is ready to accept new data).
|
|
169 try {
|
|
170 socket.close();
|
|
171 }
|
|
172 catch (IOException ignore) {
|
|
173 }
|
330
|
174
|
319
|
175 // OK, whoever tried to send data, should now agree that
|
|
176 // there is no point in further waiting =)
|
|
177 // It is safe now to acquire the semaphore.
|
|
178 }
|
|
179
|
307
|
180 synchronized (connectionSemaphore) {
|
|
181 if (!connectionClosed) {
|
319
|
182 if (useDisconnectPacket == true) {
|
|
183 try {
|
|
184 if (tc != null)
|
|
185 tc.sendMessage(new PacketDisconnect(PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload());
|
|
186 }
|
|
187 catch (IOException ignore) {
|
|
188 }
|
307
|
189
|
319
|
190 try {
|
|
191 socket.close();
|
|
192 }
|
|
193 catch (IOException ignore) {
|
|
194 }
|
322
|
195 }
|
330
|
196
|
322
|
197 connectionClosed = true;
|
|
198 reasonClosedCause = cause;
|
|
199 }
|
330
|
200
|
322
|
201 connectionSemaphore.notifyAll();
|
|
202 }
|
307
|
203
|
322
|
204 // check if we need to inform the monitors
|
323
|
205 List<ConnectionMonitor> monitors = null;
|
307
|
206
|
322
|
207 synchronized (this) {
|
|
208 // Short term lock to protect "connectionMonitors"
|
|
209 // and "monitorsWereInformed"
|
|
210 // (they may be modified concurrently)
|
|
211 if (monitorsWereInformed == false) {
|
|
212 monitorsWereInformed = true;
|
323
|
213 monitors = new ArrayList<ConnectionMonitor>(connectionMonitors);
|
322
|
214 }
|
|
215 }
|
|
216
|
|
217 if (monitors != null) {
|
|
218 for (ConnectionMonitor cmon : monitors) {
|
|
219 try {
|
|
220 cmon.connectionLost(reasonClosedCause);
|
|
221 }
|
|
222 catch (Exception ignore) {
|
273
|
223 }
|
|
224 }
|
|
225 }
|
|
226 }
|
|
227
|
|
228 protected void startReceiver() throws IOException {
|
|
229 final Thread receiveThread = new Thread(new Runnable() {
|
|
230 public void run() {
|
|
231 try {
|
|
232 receiveLoop();
|
|
233 // Can only exit with exception
|
|
234 }
|
307
|
235 catch (IOException e) {
|
321
|
236 close(e, false);
|
273
|
237 log.warning(e.getMessage());
|
307
|
238
|
273
|
239 // Tell all handlers that it is time to say goodbye
|
307
|
240 if (km != null) {
|
273
|
241 km.handleFailure(e);
|
|
242 }
|
307
|
243
|
|
244 for (HandlerEntry he : messageHandlers) {
|
273
|
245 he.mh.handleFailure(e);
|
|
246 }
|
|
247 }
|
307
|
248
|
|
249 if (log.isDebugEnabled()) {
|
273
|
250 log.debug("Receive thread: back from receiveLoop");
|
|
251 }
|
|
252 }
|
|
253 });
|
|
254 receiveThread.setName("Transport Manager");
|
|
255 receiveThread.setDaemon(true);
|
|
256 receiveThread.start();
|
|
257 }
|
|
258
|
|
259 public void registerMessageHandler(MessageHandler mh, int low, int high) {
|
|
260 HandlerEntry he = new HandlerEntry();
|
|
261 he.mh = mh;
|
|
262 he.low = low;
|
|
263 he.high = high;
|
|
264
|
307
|
265 synchronized (messageHandlers) {
|
273
|
266 messageHandlers.add(he);
|
|
267 }
|
|
268 }
|
|
269
|
|
270 public void removeMessageHandler(MessageHandler handler) {
|
307
|
271 synchronized (messageHandlers) {
|
|
272 for (int i = 0; i < messageHandlers.size(); i++) {
|
273
|
273 HandlerEntry he = messageHandlers.get(i);
|
307
|
274
|
|
275 if (he.mh == handler) {
|
273
|
276 messageHandlers.remove(i);
|
|
277 break;
|
|
278 }
|
|
279 }
|
|
280 }
|
|
281 }
|
|
282
|
|
283 public void sendKexMessage(byte[] msg) throws IOException {
|
307
|
284 synchronized (connectionSemaphore) {
|
|
285 if (connectionClosed) {
|
324
|
286 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
|
273
|
287 }
|
307
|
288
|
273
|
289 flagKexOngoing = true;
|
307
|
290
|
273
|
291 try {
|
|
292 tc.sendMessage(msg);
|
|
293 }
|
307
|
294 catch (IOException e) {
|
321
|
295 close(e, false);
|
273
|
296 throw e;
|
|
297 }
|
|
298 }
|
|
299 }
|
|
300
|
|
301 public void kexFinished() throws IOException {
|
307
|
302 synchronized (connectionSemaphore) {
|
273
|
303 flagKexOngoing = false;
|
|
304 connectionSemaphore.notifyAll();
|
|
305 }
|
|
306 }
|
|
307
|
|
308 /**
|
|
309 * @param cwl Crypto wishlist
|
|
310 * @param dhgex Diffie-hellman group exchange
|
|
311 * @param dsa may be null if this is a client connection
|
|
312 * @param rsa may be null if this is a client connection
|
|
313 * @throws IOException
|
|
314 */
|
301
|
315 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec)
|
307
|
316 throws IOException {
|
|
317 synchronized (connectionSemaphore) {
|
|
318 if (connectionClosed) {
|
325
|
319 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
|
273
|
320 }
|
|
321 }
|
307
|
322
|
301
|
323 km.initiateKEX(cwl, dhgex, dsa, rsa, ec);
|
273
|
324 }
|
|
325
|
|
326 public void changeRecvCipher(BlockCipher bc, MAC mac) {
|
|
327 tc.changeRecvCipher(bc, mac);
|
|
328 }
|
|
329
|
|
330 public void changeSendCipher(BlockCipher bc, MAC mac) {
|
|
331 tc.changeSendCipher(bc, mac);
|
|
332 }
|
|
333
|
|
334 public void changeRecvCompression(Compressor comp) {
|
|
335 tc.changeRecvCompression(comp);
|
|
336 }
|
|
337
|
|
338 public void changeSendCompression(Compressor comp) {
|
|
339 tc.changeSendCompression(comp);
|
|
340 }
|
|
341
|
|
342 public void sendAsynchronousMessage(byte[] msg) throws IOException {
|
307
|
343 synchronized (asynchronousQueue) {
|
273
|
344 asynchronousQueue.add(new AsynchronousEntry(msg));
|
|
345 asynchronousPending = true;
|
|
346
|
307
|
347 /* This limit should be flexible enough. We need this, otherwise the peer
|
273
|
348 * can flood us with global requests (and other stuff where we have to reply
|
307
|
349 * with an asynchronous message) and (if the server just sends data and does not
|
|
350 * read what we send) this will probably put us in a low memory situation
|
|
351 * (our send queue would grow and grow and...) */
|
273
|
352
|
307
|
353 if (asynchronousQueue.size() > 100) {
|
273
|
354 throw new IOException("The peer is not consuming our asynchronous replies.");
|
|
355 }
|
|
356
|
|
357 // Check if we have an asynchronous sending thread
|
307
|
358 if (asynchronousThread == null) {
|
273
|
359 asynchronousThread = new Thread(new AsynchronousWorker());
|
|
360 asynchronousThread.setDaemon(true);
|
|
361 asynchronousThread.start();
|
|
362 // The thread will stop after 2 seconds of inactivity (i.e., empty queue)
|
|
363 }
|
307
|
364
|
273
|
365 asynchronousQueue.notifyAll();
|
|
366 }
|
|
367 }
|
|
368
|
|
369 public void setConnectionMonitors(List<ConnectionMonitor> monitors) {
|
307
|
370 synchronized (this) {
|
323
|
371 connectionMonitors = new ArrayList<ConnectionMonitor>(monitors);
|
273
|
372 }
|
|
373 }
|
|
374
|
|
375 /**
|
|
376 * Send a message but ensure that all queued messages are being sent first.
|
|
377 *
|
|
378 * @param msg Message
|
|
379 * @throws IOException
|
|
380 */
|
|
381 public void sendMessage(byte[] msg) throws IOException {
|
307
|
382 synchronized (asynchronousQueue) {
|
|
383 while (asynchronousPending) {
|
273
|
384 try {
|
|
385 asynchronousQueue.wait();
|
|
386 }
|
307
|
387 catch (InterruptedException e) {
|
273
|
388 throw new InterruptedIOException(e.getMessage());
|
|
389 }
|
|
390 }
|
|
391 }
|
307
|
392
|
273
|
393 sendMessageImmediate(msg);
|
|
394 }
|
|
395
|
|
396 /**
|
|
397 * Send message, ignore queued async messages that have not been delivered yet.
|
|
398 * Will be called directly from the asynchronousThread thread.
|
|
399 *
|
|
400 * @param msg Message
|
|
401 * @throws IOException
|
|
402 */
|
|
403 public void sendMessageImmediate(byte[] msg) throws IOException {
|
307
|
404 synchronized (connectionSemaphore) {
|
|
405 while (true) {
|
|
406 if (connectionClosed) {
|
324
|
407 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
|
273
|
408 }
|
307
|
409
|
|
410 if (!flagKexOngoing) {
|
273
|
411 break;
|
|
412 }
|
307
|
413
|
273
|
414 try {
|
|
415 connectionSemaphore.wait();
|
|
416 }
|
307
|
417 catch (InterruptedException e) {
|
273
|
418 throw new InterruptedIOException(e.getMessage());
|
|
419 }
|
|
420 }
|
|
421
|
|
422 try {
|
|
423 tc.sendMessage(msg);
|
|
424 }
|
307
|
425 catch (IOException e) {
|
321
|
426 close(e, false);
|
273
|
427 throw e;
|
|
428 }
|
|
429 }
|
|
430 }
|
|
431
|
|
432 private void receiveLoop() throws IOException {
|
307
|
433 while (true) {
|
273
|
434 final byte[] buffer = new byte[MAX_PACKET_SIZE];
|
|
435 final int length = tc.receiveMessage(buffer, 0, buffer.length);
|
|
436 final byte[] packet = new byte[length];
|
|
437 System.arraycopy(buffer, 0, packet, 0, length);
|
|
438 final int type = packet[0] & 0xff;
|
333
|
439 log.debug(String.format("transport manager receive loop type %d", type));
|
307
|
440
|
|
441 switch (type) {
|
273
|
442 case Packets.SSH_MSG_IGNORE:
|
|
443 break;
|
307
|
444
|
273
|
445 case Packets.SSH_MSG_DEBUG: {
|
307
|
446 TypesReader tr = new TypesReader(packet);
|
|
447 tr.readByte();
|
|
448 // always_display
|
|
449 tr.readBoolean();
|
|
450 String message = tr.readString();
|
|
451
|
|
452 if (log.isDebugEnabled()) {
|
|
453 log.debug(String.format("Debug message from remote: '%s'", message));
|
|
454 }
|
|
455
|
|
456 break;
|
273
|
457 }
|
307
|
458
|
273
|
459 case Packets.SSH_MSG_UNIMPLEMENTED:
|
|
460 throw new PacketTypeException(type);
|
307
|
461
|
273
|
462 case Packets.SSH_MSG_DISCONNECT: {
|
307
|
463 final PacketDisconnect disconnect = new PacketDisconnect(packet);
|
|
464 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage());
|
|
465 }
|
|
466
|
273
|
467 case Packets.SSH_MSG_KEXINIT:
|
|
468 case Packets.SSH_MSG_NEWKEYS:
|
|
469 case Packets.SSH_MSG_KEXDH_INIT:
|
|
470 case Packets.SSH_MSG_KEXDH_REPLY:
|
|
471 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST:
|
|
472 case Packets.SSH_MSG_KEX_DH_GEX_INIT:
|
|
473 case Packets.SSH_MSG_KEX_DH_GEX_REPLY:
|
|
474 // Is it a KEX Packet
|
|
475 km.handleMessage(packet);
|
|
476 break;
|
307
|
477
|
273
|
478 case Packets.SSH_MSG_USERAUTH_SUCCESS:
|
|
479 tc.startCompression();
|
307
|
480
|
|
481 // Continue with message handlers
|
273
|
482 default:
|
|
483 boolean handled = false;
|
307
|
484
|
|
485 for (HandlerEntry handler : messageHandlers) {
|
|
486 if ((handler.low <= type) && (type <= handler.high)) {
|
273
|
487 handler.mh.handleMessage(packet);
|
|
488 handled = true;
|
|
489 break;
|
|
490 }
|
|
491 }
|
307
|
492
|
|
493 if (!handled) {
|
273
|
494 throw new PacketTypeException(type);
|
|
495 }
|
307
|
496
|
273
|
497 break;
|
|
498 }
|
307
|
499
|
|
500 if (log.isDebugEnabled()) {
|
273
|
501 log.debug(String.format("Handled packet %d", type));
|
|
502 }
|
|
503 }
|
|
504 }
|
|
505 }
|