comparison src/ch/ethz/ssh2/transport/TransportManager.java @ 273:91a31873c42a ganymed

start conversion from trilead to ganymed
author Carl Byington <carl@five-ten-sg.com>
date Fri, 18 Jul 2014 11:21:46 -0700
parents
children d7e088fa2123
comparison
equal deleted inserted replaced
272:ce2f4e397703 273:91a31873c42a
1 /*
2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved.
3 * Please refer to the LICENSE.txt for licensing details.
4 */
5
6 package ch.ethz.ssh2.transport;
7
8 import java.io.IOException;
9 import java.io.InterruptedIOException;
10 import java.net.Socket;
11 import java.util.ArrayList;
12 import java.util.List;
13
14 import ch.ethz.ssh2.ConnectionInfo;
15 import ch.ethz.ssh2.ConnectionMonitor;
16 import ch.ethz.ssh2.DHGexParameters;
17 import ch.ethz.ssh2.PacketTypeException;
18 import ch.ethz.ssh2.compression.Compressor;
19 import ch.ethz.ssh2.crypto.CryptoWishList;
20 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
21 import ch.ethz.ssh2.crypto.digest.MAC;
22 import ch.ethz.ssh2.log.Logger;
23 import ch.ethz.ssh2.packets.PacketDisconnect;
24 import ch.ethz.ssh2.packets.Packets;
25 import ch.ethz.ssh2.packets.TypesReader;
26 import ch.ethz.ssh2.signature.DSAPrivateKey;
27 import ch.ethz.ssh2.signature.RSAPrivateKey;
28
29 /**
30 * Yes, the "standard" is a big mess. On one side, the say that arbitrary channel
31 * packets are allowed during kex exchange, on the other side we need to blindly
32 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
33 * the next packet is not a channel data packet? Yes, we could check if it is in
34 * the KEX range. But the standard says nothing about this. The OpenSSH guys
35 * block local "normal" traffic during KEX. That's fine - however, they assume
36 * that the other side is doing the same. During re-key, if they receive traffic
37 * other than KEX, they become horribly irritated and kill the connection. Since
38 * we are very likely going to communicate with OpenSSH servers, we have to play
39 * the same game - even though we could do better.
40 *
41 * @author Christian Plattner
42 * @version $Id: TransportManager.java 161 2014-05-01 18:01:55Z dkocher@sudo.ch $
43 */
44 public abstract class TransportManager {
45 private static final Logger log = Logger.getLogger(TransportManager.class);
46
47 private static final class HandlerEntry {
48 MessageHandler mh;
49 int low;
50 int high;
51 }
52
53 /**
54 * Advertised maximum SSH packet size that the other side can send to us.
55 */
56 public static final int MAX_PACKET_SIZE = 64 * 1024;
57
58 private final List<AsynchronousEntry> asynchronousQueue
59 = new ArrayList<AsynchronousEntry>();
60
61 private Thread asynchronousThread = null;
62 private boolean asynchronousPending = false;
63
64 private Socket socket;
65
66 protected TransportManager(final Socket socket) {
67 this.socket = socket;
68 }
69
70 private static final class AsynchronousEntry {
71 public byte[] message;
72
73 public AsynchronousEntry(byte[] message) {
74 this.message = message;
75 }
76 }
77
78 private final class AsynchronousWorker implements Runnable {
79 @Override
80 public void run() {
81 while(true) {
82 final AsynchronousEntry item;
83 synchronized(asynchronousQueue) {
84 if(asynchronousQueue.size() == 0) {
85 // Only now we may reset the flag, since we are sure that all queued items
86 // have been sent (there is a slight delay between de-queuing and sending,
87 // this is why we need this flag! See code below. Sending takes place outside
88 // of this lock, this is why a test for size()==0 (from another thread) does not ensure
89 // that all messages have been sent.
90
91 asynchronousPending = false;
92
93 // Notify any senders that they can proceed, all async messages have been delivered
94
95 asynchronousQueue.notifyAll();
96
97 // After the queue is empty for about 2 seconds, stop this thread
98 try {
99 asynchronousQueue.wait(2000);
100 }
101 catch(InterruptedException ignore) {
102 //
103 }
104 if(asynchronousQueue.size() == 0) {
105 asynchronousThread = null;
106 return;
107 }
108 }
109 item = asynchronousQueue.remove(0);
110 }
111 try {
112 sendMessageImmediate(item.message);
113 }
114 catch(IOException e) {
115 // There is no point in handling it - it simply means that the connection has a problem and we should stop
116 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null):
117 // further messages in the queue cannot be sent by this or any other thread.
118 // Other threads will sooner or later (when receiving or sending the next message) get the
119 // same IOException and get to the same conclusion.
120 log.warning(e.getMessage());
121 return;
122 }
123 }
124 }
125 }
126
127 private final Object connectionSemaphore = new Object();
128
129 private boolean flagKexOngoing;
130
131 private boolean connectionClosed;
132 private IOException reasonClosedCause;
133
134 private TransportConnection tc;
135 private KexManager km;
136
137 private final List<HandlerEntry> messageHandlers
138 = new ArrayList<HandlerEntry>();
139
140 private List<ConnectionMonitor> connectionMonitors
141 = new ArrayList<ConnectionMonitor>();
142
143 protected void init(TransportConnection tc, KexManager km) {
144 this.tc = tc;
145 this.km = km;
146 }
147
148 public int getPacketOverheadEstimate() {
149 return tc.getPacketOverheadEstimate();
150 }
151
152 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException {
153 return km.getOrWaitForConnectionInfo(kexNumber);
154 }
155
156 public IOException getReasonClosedCause() {
157 synchronized(connectionSemaphore) {
158 return reasonClosedCause;
159 }
160 }
161
162 public byte[] getSessionIdentifier() {
163 return km.sessionId;
164 }
165
166 public void close() {
167 // It is safe now to acquire the semaphore.
168 synchronized(connectionSemaphore) {
169 if(!connectionClosed) {
170 try {
171 tc.sendMessage(new PacketDisconnect(
172 PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload());
173 }
174 catch(IOException ignore) {
175 //
176 }
177 try {
178 socket.close();
179 }
180 catch(IOException ignore) {
181 //
182 }
183 connectionClosed = true;
184 synchronized(this) {
185 for(ConnectionMonitor cmon : connectionMonitors) {
186 cmon.connectionLost(reasonClosedCause);
187 }
188 }
189 }
190 connectionSemaphore.notifyAll();
191 }
192 }
193
194 public void close(IOException cause) {
195 // Do not acquire the semaphore, perhaps somebody is inside (and waits until
196 // the remote side is ready to accept new data
197 try {
198 socket.close();
199 }
200 catch(IOException ignore) {
201 }
202 // It is safe now to acquire the semaphore.
203 synchronized(connectionSemaphore) {
204 connectionClosed = true;
205 reasonClosedCause = cause;
206 connectionSemaphore.notifyAll();
207 }
208 synchronized(this) {
209 for(ConnectionMonitor cmon : connectionMonitors) {
210 cmon.connectionLost(reasonClosedCause);
211 }
212 }
213 }
214
215 protected void startReceiver() throws IOException {
216 final Thread receiveThread = new Thread(new Runnable() {
217 public void run() {
218 try {
219 receiveLoop();
220 // Can only exit with exception
221 }
222 catch(IOException e) {
223 close(e);
224 log.warning(e.getMessage());
225 // Tell all handlers that it is time to say goodbye
226 if(km != null) {
227 km.handleFailure(e);
228 }
229 for(HandlerEntry he : messageHandlers) {
230 he.mh.handleFailure(e);
231 }
232 }
233 if(log.isDebugEnabled()) {
234 log.debug("Receive thread: back from receiveLoop");
235 }
236 }
237 });
238 receiveThread.setName("Transport Manager");
239 receiveThread.setDaemon(true);
240 receiveThread.start();
241 }
242
243 public void registerMessageHandler(MessageHandler mh, int low, int high) {
244 HandlerEntry he = new HandlerEntry();
245 he.mh = mh;
246 he.low = low;
247 he.high = high;
248
249 synchronized(messageHandlers) {
250 messageHandlers.add(he);
251 }
252 }
253
254 public void removeMessageHandler(MessageHandler handler) {
255 synchronized(messageHandlers) {
256 for(int i = 0; i < messageHandlers.size(); i++) {
257 HandlerEntry he = messageHandlers.get(i);
258 if(he.mh == handler) {
259 messageHandlers.remove(i);
260 break;
261 }
262 }
263 }
264 }
265
266 public void sendKexMessage(byte[] msg) throws IOException {
267 synchronized(connectionSemaphore) {
268 if(connectionClosed) {
269 throw reasonClosedCause;
270 }
271 flagKexOngoing = true;
272 try {
273 tc.sendMessage(msg);
274 }
275 catch(IOException e) {
276 close(e);
277 throw e;
278 }
279 }
280 }
281
282 public void kexFinished() throws IOException {
283 synchronized(connectionSemaphore) {
284 flagKexOngoing = false;
285 connectionSemaphore.notifyAll();
286 }
287 }
288
289 /**
290 * @param cwl Crypto wishlist
291 * @param dhgex Diffie-hellman group exchange
292 * @param dsa may be null if this is a client connection
293 * @param rsa may be null if this is a client connection
294 * @throws IOException
295 */
296 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, DSAPrivateKey dsa, RSAPrivateKey rsa)
297 throws IOException {
298 synchronized(connectionSemaphore) {
299 if(connectionClosed) {
300 // Inform the caller that there is no point in triggering a new kex
301 throw reasonClosedCause;
302 }
303 }
304 km.initiateKEX(cwl, dhgex, dsa, rsa);
305 }
306
307 public void changeRecvCipher(BlockCipher bc, MAC mac) {
308 tc.changeRecvCipher(bc, mac);
309 }
310
311 public void changeSendCipher(BlockCipher bc, MAC mac) {
312 tc.changeSendCipher(bc, mac);
313 }
314
315 public void changeRecvCompression(Compressor comp) {
316 tc.changeRecvCompression(comp);
317 }
318
319 public void changeSendCompression(Compressor comp) {
320 tc.changeSendCompression(comp);
321 }
322
323 public void sendAsynchronousMessage(byte[] msg) throws IOException {
324 synchronized(asynchronousQueue) {
325 asynchronousQueue.add(new AsynchronousEntry(msg));
326 asynchronousPending = true;
327
328 /* This limit should be flexible enough. We need this, otherwise the peer
329 * can flood us with global requests (and other stuff where we have to reply
330 * with an asynchronous message) and (if the server just sends data and does not
331 * read what we send) this will probably put us in a low memory situation
332 * (our send queue would grow and grow and...) */
333
334 if(asynchronousQueue.size() > 100) {
335 throw new IOException("The peer is not consuming our asynchronous replies.");
336 }
337
338 // Check if we have an asynchronous sending thread
339 if(asynchronousThread == null) {
340 asynchronousThread = new Thread(new AsynchronousWorker());
341 asynchronousThread.setDaemon(true);
342 asynchronousThread.start();
343 // The thread will stop after 2 seconds of inactivity (i.e., empty queue)
344 }
345 asynchronousQueue.notifyAll();
346 }
347 }
348
349 public void setConnectionMonitors(List<ConnectionMonitor> monitors) {
350 synchronized(this) {
351 connectionMonitors = new ArrayList<ConnectionMonitor>();
352 connectionMonitors.addAll(monitors);
353 }
354 }
355
356 /**
357 * Send a message but ensure that all queued messages are being sent first.
358 *
359 * @param msg Message
360 * @throws IOException
361 */
362 public void sendMessage(byte[] msg) throws IOException {
363 synchronized(asynchronousQueue) {
364 while(asynchronousPending) {
365 try {
366 asynchronousQueue.wait();
367 }
368 catch(InterruptedException e) {
369 throw new InterruptedIOException(e.getMessage());
370 }
371 }
372 }
373 sendMessageImmediate(msg);
374 }
375
376 /**
377 * Send message, ignore queued async messages that have not been delivered yet.
378 * Will be called directly from the asynchronousThread thread.
379 *
380 * @param msg Message
381 * @throws IOException
382 */
383 public void sendMessageImmediate(byte[] msg) throws IOException {
384 synchronized(connectionSemaphore) {
385 while(true) {
386 if(connectionClosed) {
387 throw reasonClosedCause;
388 }
389 if(!flagKexOngoing) {
390 break;
391 }
392 try {
393 connectionSemaphore.wait();
394 }
395 catch(InterruptedException e) {
396 throw new InterruptedIOException(e.getMessage());
397 }
398 }
399
400 try {
401 tc.sendMessage(msg);
402 }
403 catch(IOException e) {
404 close(e);
405 throw e;
406 }
407 }
408 }
409
410 private void receiveLoop() throws IOException {
411 while(true) {
412 final byte[] buffer = new byte[MAX_PACKET_SIZE];
413 final int length = tc.receiveMessage(buffer, 0, buffer.length);
414 final byte[] packet = new byte[length];
415 System.arraycopy(buffer, 0, packet, 0, length);
416 final int type = packet[0] & 0xff;
417 switch(type) {
418 case Packets.SSH_MSG_IGNORE:
419 break;
420 case Packets.SSH_MSG_DEBUG: {
421 TypesReader tr = new TypesReader(packet);
422 tr.readByte();
423 // always_display
424 tr.readBoolean();
425 String message = tr.readString();
426 if(log.isDebugEnabled()) {
427 log.debug(String.format("Debug message from remote: '%s'", message));
428 }
429 break;
430 }
431 case Packets.SSH_MSG_UNIMPLEMENTED:
432 throw new PacketTypeException(type);
433 case Packets.SSH_MSG_DISCONNECT: {
434 final PacketDisconnect disconnect = new PacketDisconnect(packet);
435 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage());
436 }
437 case Packets.SSH_MSG_KEXINIT:
438 case Packets.SSH_MSG_NEWKEYS:
439 case Packets.SSH_MSG_KEXDH_INIT:
440 case Packets.SSH_MSG_KEXDH_REPLY:
441 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST:
442 case Packets.SSH_MSG_KEX_DH_GEX_INIT:
443 case Packets.SSH_MSG_KEX_DH_GEX_REPLY:
444 // Is it a KEX Packet
445 km.handleMessage(packet);
446 break;
447 case Packets.SSH_MSG_USERAUTH_SUCCESS:
448 tc.startCompression();
449 // Continue with message handlers
450 default:
451 boolean handled = false;
452 for(HandlerEntry handler : messageHandlers) {
453 if((handler.low <= type) && (type <= handler.high)) {
454 handler.mh.handleMessage(packet);
455 handled = true;
456 break;
457 }
458 }
459 if(!handled) {
460 throw new PacketTypeException(type);
461 }
462 break;
463 }
464 if(log.isDebugEnabled()) {
465 log.debug(String.format("Handled packet %d", type));
466 }
467 }
468 }
469 }