comparison app/src/main/java/ch/ethz/ssh2/transport/TransportManager.java @ 438:d29cce60f393

migrate from Eclipse to Android Studio
author Carl Byington <carl@five-ten-sg.com>
date Thu, 03 Dec 2015 11:23:55 -0800
parents src/ch/ethz/ssh2/transport/TransportManager.java@126af684034e
children
comparison
equal deleted inserted replaced
437:208b31032318 438:d29cce60f393
1 /*
2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved.
3 * Please refer to the LICENSE.txt for licensing details.
4 */
5
6 package ch.ethz.ssh2.transport;
7
8 import java.io.IOException;
9 import java.io.InterruptedIOException;
10 import java.net.Socket;
11 import java.security.KeyPair;
12 import java.util.ArrayList;
13 import java.util.List;
14
15 import ch.ethz.ssh2.ConnectionInfo;
16 import ch.ethz.ssh2.ConnectionMonitor;
17 import ch.ethz.ssh2.DHGexParameters;
18 import ch.ethz.ssh2.PacketTypeException;
19 import ch.ethz.ssh2.compression.Compressor;
20 import ch.ethz.ssh2.crypto.CryptoWishList;
21 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
22 import ch.ethz.ssh2.crypto.digest.MAC;
23 import ch.ethz.ssh2.log.Logger;
24 import ch.ethz.ssh2.packets.PacketDisconnect;
25 import ch.ethz.ssh2.packets.Packets;
26 import ch.ethz.ssh2.packets.TypesReader;
27
28 /**
29 * Yes, the "standard" is a big mess. On one side, the say that arbitrary channel
30 * packets are allowed during kex exchange, on the other side we need to blindly
31 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
32 * the next packet is not a channel data packet? Yes, we could check if it is in
33 * the KEX range. But the standard says nothing about this. The OpenSSH guys
34 * block local "normal" traffic during KEX. That's fine - however, they assume
35 * that the other side is doing the same. During re-key, if they receive traffic
36 * other than KEX, they become horribly irritated and kill the connection. Since
37 * we are very likely going to communicate with OpenSSH servers, we have to play
38 * the same game - even though we could do better.
39 *
40 * @author Christian Plattner
41 * @version $Id: TransportManager.java 161 2014-05-01 18:01:55Z dkocher@sudo.ch $
42 */
43 public abstract class TransportManager {
44 protected static final Logger log = Logger.getLogger(TransportManager.class);
45
46 private static final class HandlerEntry {
47 MessageHandler mh;
48 int low;
49 int high;
50 }
51
52 /**
53 * Advertised maximum SSH packet size that the other side can send to us.
54 */
55 public static final int MAX_PACKET_SIZE = 64 * 1024;
56
57 private final List<AsynchronousEntry> asynchronousQueue
58 = new ArrayList<AsynchronousEntry>();
59
60 private Thread asynchronousThread = null;
61 private boolean asynchronousPending = false;
62
63 private Socket socket;
64
65 protected TransportManager(final Socket socket) {
66 this.socket = socket;
67 }
68
69 private static final class AsynchronousEntry {
70 public byte[] message;
71
72 public AsynchronousEntry(byte[] message) {
73 this.message = message;
74 }
75 }
76
77 private final class AsynchronousWorker implements Runnable {
78 public void run() {
79 while (true) {
80 final AsynchronousEntry item;
81
82 synchronized (asynchronousQueue) {
83 if (asynchronousQueue.size() == 0) {
84 // Only now we may reset the flag, since we are sure that all queued items
85 // have been sent (there is a slight delay between de-queuing and sending,
86 // this is why we need this flag! See code below. Sending takes place outside
87 // of this lock, this is why a test for size()==0 (from another thread) does not ensure
88 // that all messages have been sent.
89 asynchronousPending = false;
90 // Notify any senders that they can proceed, all async messages have been delivered
91 asynchronousQueue.notifyAll();
92
93 // After the queue is empty for about 2 seconds, stop this thread
94 try {
95 asynchronousQueue.wait(2000);
96 }
97 catch (InterruptedException ignore) {
98 //
99 }
100
101 if (asynchronousQueue.size() == 0) {
102 asynchronousThread = null;
103 return;
104 }
105 }
106
107 item = asynchronousQueue.remove(0);
108 }
109
110 try {
111 sendMessageImmediate(item.message);
112 }
113 catch (IOException e) {
114 // There is no point in handling it - it simply means that the connection has a problem and we should stop
115 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null):
116 // further messages in the queue cannot be sent by this or any other thread.
117 // Other threads will sooner or later (when receiving or sending the next message) get the
118 // same IOException and get to the same conclusion.
119 log.warning(e.getMessage());
120 return;
121 }
122 }
123 }
124 }
125
126 private final Object connectionSemaphore = new Object();
127
128 private boolean flagKexOngoing;
129
130 private boolean connectionClosed;
131 private Throwable reasonClosedCause;
132
133 private TransportConnection tc;
134 private KexManager km;
135
136 private final List<HandlerEntry> messageHandlers = new ArrayList<HandlerEntry>();
137
138 private List<ConnectionMonitor> connectionMonitors = new ArrayList<ConnectionMonitor>();
139 boolean monitorsWereInformed = false;
140
141 protected void init(TransportConnection tc, KexManager km) {
142 this.tc = tc;
143 this.km = km;
144 }
145
146 public int getPacketOverheadEstimate() {
147 return tc.getPacketOverheadEstimate();
148 }
149
150 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException {
151 return km.getOrWaitForConnectionInfo(kexNumber);
152 }
153
154 public Throwable getReasonClosedCause() {
155 synchronized (connectionSemaphore) {
156 return reasonClosedCause;
157 }
158 }
159
160 public byte[] getSessionIdentifier() {
161 return km.sessionId;
162 }
163
164 public void close(Throwable cause, boolean useDisconnectPacket) {
165 if (useDisconnectPacket == false) {
166 // OK, hard shutdown - do not acquire the semaphore,
167 // perhaps somebody is inside (and waits until
168 // the remote side is ready to accept new data).
169 try {
170 socket.close();
171 }
172 catch (IOException ignore) {
173 }
174
175 // OK, whoever tried to send data, should now agree that
176 // there is no point in further waiting =)
177 // It is safe now to acquire the semaphore.
178 }
179
180 synchronized (connectionSemaphore) {
181 if (!connectionClosed) {
182 if (useDisconnectPacket == true) {
183 try {
184 if (tc != null)
185 tc.sendMessage(new PacketDisconnect(PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload());
186 }
187 catch (IOException ignore) {
188 }
189
190 try {
191 socket.close();
192 }
193 catch (IOException ignore) {
194 }
195 }
196
197 connectionClosed = true;
198 reasonClosedCause = cause;
199 }
200
201 connectionSemaphore.notifyAll();
202 }
203
204 // check if we need to inform the monitors
205 List<ConnectionMonitor> monitors = null;
206
207 synchronized (this) {
208 // Short term lock to protect "connectionMonitors"
209 // and "monitorsWereInformed"
210 // (they may be modified concurrently)
211 if (monitorsWereInformed == false) {
212 monitorsWereInformed = true;
213 monitors = new ArrayList<ConnectionMonitor>(connectionMonitors);
214 }
215 }
216
217 if (monitors != null) {
218 for (ConnectionMonitor cmon : monitors) {
219 try {
220 cmon.connectionLost(reasonClosedCause);
221 }
222 catch (Exception ignore) {
223 }
224 }
225 }
226 }
227
228 protected void startReceiver() throws IOException {
229 final Thread receiveThread = new Thread(new Runnable() {
230 public void run() {
231 try {
232 receiveLoop();
233 // Can only exit with exception
234 }
235 catch (IOException e) {
236 close(e, false);
237 log.warning(e.getMessage());
238
239 // Tell all handlers that it is time to say goodbye
240 if (km != null) {
241 km.handleFailure(e);
242 }
243
244 for (HandlerEntry he : messageHandlers) {
245 he.mh.handleFailure(e);
246 }
247 }
248
249 if (log.isDebugEnabled()) {
250 log.debug("Receive thread: back from receiveLoop");
251 }
252 }
253 });
254 receiveThread.setName("Transport Manager");
255 receiveThread.setDaemon(true);
256 receiveThread.start();
257 }
258
259 public void registerMessageHandler(MessageHandler mh, int low, int high) {
260 HandlerEntry he = new HandlerEntry();
261 he.mh = mh;
262 he.low = low;
263 he.high = high;
264
265 synchronized (messageHandlers) {
266 messageHandlers.add(he);
267 }
268 }
269
270 public void removeMessageHandler(MessageHandler handler) {
271 synchronized (messageHandlers) {
272 for (int i = 0; i < messageHandlers.size(); i++) {
273 HandlerEntry he = messageHandlers.get(i);
274
275 if (he.mh == handler) {
276 messageHandlers.remove(i);
277 break;
278 }
279 }
280 }
281 }
282
283 public void sendKexMessage(byte[] msg) throws IOException {
284 synchronized (connectionSemaphore) {
285 if (connectionClosed) {
286 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
287 }
288
289 flagKexOngoing = true;
290
291 try {
292 tc.sendMessage(msg);
293 }
294 catch (IOException e) {
295 close(e, false);
296 throw e;
297 }
298 }
299 }
300
301 public void kexFinished() throws IOException {
302 synchronized (connectionSemaphore) {
303 flagKexOngoing = false;
304 connectionSemaphore.notifyAll();
305 }
306 }
307
308 /**
309 * @param cwl Crypto wishlist
310 * @param dhgex Diffie-hellman group exchange
311 * @param dsa may be null if this is a client connection
312 * @param rsa may be null if this is a client connection
313 * @throws IOException
314 */
315 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec)
316 throws IOException {
317 synchronized (connectionSemaphore) {
318 if (connectionClosed) {
319 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
320 }
321 }
322
323 km.initiateKEX(cwl, dhgex, dsa, rsa, ec);
324 }
325
326 public void changeRecvCipher(BlockCipher bc, MAC mac) {
327 tc.changeRecvCipher(bc, mac);
328 }
329
330 public void changeSendCipher(BlockCipher bc, MAC mac) {
331 tc.changeSendCipher(bc, mac);
332 }
333
334 public void changeRecvCompression(Compressor comp) {
335 tc.changeRecvCompression(comp);
336 }
337
338 public void changeSendCompression(Compressor comp) {
339 tc.changeSendCompression(comp);
340 }
341
342 public void sendAsynchronousMessage(byte[] msg) throws IOException {
343 synchronized (asynchronousQueue) {
344 asynchronousQueue.add(new AsynchronousEntry(msg));
345 asynchronousPending = true;
346
347 /* This limit should be flexible enough. We need this, otherwise the peer
348 * can flood us with global requests (and other stuff where we have to reply
349 * with an asynchronous message) and (if the server just sends data and does not
350 * read what we send) this will probably put us in a low memory situation
351 * (our send queue would grow and grow and...) */
352
353 if (asynchronousQueue.size() > 100) {
354 throw new IOException("The peer is not consuming our asynchronous replies.");
355 }
356
357 // Check if we have an asynchronous sending thread
358 if (asynchronousThread == null) {
359 asynchronousThread = new Thread(new AsynchronousWorker());
360 asynchronousThread.setDaemon(true);
361 asynchronousThread.start();
362 // The thread will stop after 2 seconds of inactivity (i.e., empty queue)
363 }
364
365 asynchronousQueue.notifyAll();
366 }
367 }
368
369 public void setConnectionMonitors(List<ConnectionMonitor> monitors) {
370 synchronized (this) {
371 connectionMonitors = new ArrayList<ConnectionMonitor>(monitors);
372 }
373 }
374
375 /**
376 * Send a message but ensure that all queued messages are being sent first.
377 *
378 * @param msg Message
379 * @throws IOException
380 */
381 public void sendMessage(byte[] msg) throws IOException {
382 synchronized (asynchronousQueue) {
383 while (asynchronousPending) {
384 try {
385 asynchronousQueue.wait();
386 }
387 catch (InterruptedException e) {
388 throw new InterruptedIOException(e.getMessage());
389 }
390 }
391 }
392
393 sendMessageImmediate(msg);
394 }
395
396 /**
397 * Send message, ignore queued async messages that have not been delivered yet.
398 * Will be called directly from the asynchronousThread thread.
399 *
400 * @param msg Message
401 * @throws IOException
402 */
403 public void sendMessageImmediate(byte[] msg) throws IOException {
404 synchronized (connectionSemaphore) {
405 while (true) {
406 if (connectionClosed) {
407 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
408 }
409
410 if (!flagKexOngoing) {
411 break;
412 }
413
414 try {
415 connectionSemaphore.wait();
416 }
417 catch (InterruptedException e) {
418 throw new InterruptedIOException(e.getMessage());
419 }
420 }
421
422 try {
423 tc.sendMessage(msg);
424 }
425 catch (IOException e) {
426 close(e, false);
427 throw e;
428 }
429 }
430 }
431
432 private void receiveLoop() throws IOException {
433 while (true) {
434 final byte[] buffer = new byte[MAX_PACKET_SIZE];
435 final int length = tc.receiveMessage(buffer, 0, buffer.length);
436 final byte[] packet = new byte[length];
437 System.arraycopy(buffer, 0, packet, 0, length);
438 final int type = packet[0] & 0xff;
439 log.debug(String.format("transport manager receive loop type %d", type));
440
441 switch (type) {
442 case Packets.SSH_MSG_IGNORE:
443 break;
444
445 case Packets.SSH_MSG_DEBUG: {
446 TypesReader tr = new TypesReader(packet);
447 tr.readByte();
448 // always_display
449 tr.readBoolean();
450 String message = tr.readString();
451
452 if (log.isDebugEnabled()) {
453 log.debug(String.format("Debug message from remote: '%s'", message));
454 }
455
456 break;
457 }
458
459 case Packets.SSH_MSG_UNIMPLEMENTED:
460 throw new PacketTypeException(type);
461
462 case Packets.SSH_MSG_DISCONNECT: {
463 final PacketDisconnect disconnect = new PacketDisconnect(packet);
464 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage());
465 }
466
467 case Packets.SSH_MSG_KEXINIT:
468 case Packets.SSH_MSG_NEWKEYS:
469 case Packets.SSH_MSG_KEXDH_INIT:
470 case Packets.SSH_MSG_KEXDH_REPLY:
471 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST:
472 case Packets.SSH_MSG_KEX_DH_GEX_INIT:
473 case Packets.SSH_MSG_KEX_DH_GEX_REPLY:
474 // Is it a KEX Packet
475 km.handleMessage(packet);
476 break;
477
478 case Packets.SSH_MSG_USERAUTH_SUCCESS:
479 tc.startCompression();
480
481 // Continue with message handlers
482 default:
483 boolean handled = false;
484
485 for (HandlerEntry handler : messageHandlers) {
486 if ((handler.low <= type) && (type <= handler.high)) {
487 handler.mh.handleMessage(packet);
488 handled = true;
489 break;
490 }
491 }
492
493 if (!handled) {
494 throw new PacketTypeException(type);
495 }
496
497 break;
498 }
499
500 if (log.isDebugEnabled()) {
501 log.debug(String.format("Handled packet %d", type));
502 }
503 }
504 }
505 }