comparison src/com/trilead/ssh2/transport/TransportManager.java @ 0:0ce5cc452d02

initial version
author Carl Byington <carl@five-ten-sg.com>
date Thu, 22 May 2014 10:41:19 -0700
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:0ce5cc452d02
1
2 package com.trilead.ssh2.transport;
3
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.net.InetAddress;
8 import java.net.InetSocketAddress;
9 import java.net.Socket;
10 import java.net.UnknownHostException;
11 import java.security.SecureRandom;
12 import java.util.Vector;
13
14 import com.trilead.ssh2.ConnectionInfo;
15 import com.trilead.ssh2.ConnectionMonitor;
16 import com.trilead.ssh2.DHGexParameters;
17 import com.trilead.ssh2.HTTPProxyData;
18 import com.trilead.ssh2.HTTPProxyException;
19 import com.trilead.ssh2.ProxyData;
20 import com.trilead.ssh2.ServerHostKeyVerifier;
21 import com.trilead.ssh2.compression.ICompressor;
22 import com.trilead.ssh2.crypto.Base64;
23 import com.trilead.ssh2.crypto.CryptoWishList;
24 import com.trilead.ssh2.crypto.cipher.BlockCipher;
25 import com.trilead.ssh2.crypto.digest.MAC;
26 import com.trilead.ssh2.log.Logger;
27 import com.trilead.ssh2.packets.PacketDisconnect;
28 import com.trilead.ssh2.packets.Packets;
29 import com.trilead.ssh2.packets.TypesReader;
30 import com.trilead.ssh2.util.Tokenizer;
31
32
33 /*
34 * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
35 * packets are allowed during kex exchange, on the other side we need to blindly
36 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
37 * the next packet is not a channel data packet? Yes, we could check if it is in
38 * the KEX range. But the standard says nothing about this. The OpenSSH guys
39 * block local "normal" traffic during KEX. That's fine - however, they assume
40 * that the other side is doing the same. During re-key, if they receive traffic
41 * other than KEX, they become horribly irritated and kill the connection. Since
42 * we are very likely going to communicate with OpenSSH servers, we have to play
43 * the same game - even though we could do better.
44 *
45 * btw: having stdout and stderr on the same channel, with a shared window, is
46 * also a VERY good idea... =(
47 */
48
49 /**
50 * TransportManager.
51 *
52 * @author Christian Plattner, plattner@trilead.com
53 * @version $Id: TransportManager.java,v 1.2 2008/04/01 12:38:09 cplattne Exp $
54 */
55 public class TransportManager {
56 private static final Logger log = Logger.getLogger(TransportManager.class);
57
58 class HandlerEntry {
59 MessageHandler mh;
60 int low;
61 int high;
62 }
63
64 private final Vector<byte[]> asynchronousQueue = new Vector<byte[]>();
65 private Thread asynchronousThread = null;
66
67 class AsynchronousWorker extends Thread {
68 public void run() {
69 while (true) {
70 byte[] msg = null;
71
72 synchronized (asynchronousQueue) {
73 if (asynchronousQueue.size() == 0) {
74 /* After the queue is empty for about 2 seconds, stop this thread */
75 try {
76 asynchronousQueue.wait(2000);
77 }
78 catch (InterruptedException e) {
79 /* OKOK, if somebody interrupts us, then we may die earlier. */
80 }
81
82 if (asynchronousQueue.size() == 0) {
83 asynchronousThread = null;
84 return;
85 }
86 }
87
88 msg = asynchronousQueue.remove(0);
89 }
90
91 /* The following invocation may throw an IOException.
92 * There is no point in handling it - it simply means
93 * that the connection has a problem and we should stop
94 * sending asynchronously messages. We do not need to signal that
95 * we have exited (asynchronousThread = null): further
96 * messages in the queue cannot be sent by this or any
97 * other thread.
98 * Other threads will sooner or later (when receiving or
99 * sending the next message) get the same IOException and
100 * get to the same conclusion.
101 */
102
103 try {
104 sendMessage(msg);
105 }
106 catch (IOException e) {
107 return;
108 }
109 }
110 }
111 }
112
113 String hostname;
114 int port;
115 final Socket sock = new Socket();
116
117 Object connectionSemaphore = new Object();
118
119 boolean flagKexOngoing = false;
120 boolean connectionClosed = false;
121
122 Throwable reasonClosedCause = null;
123
124 TransportConnection tc;
125 KexManager km;
126
127 Vector<HandlerEntry> messageHandlers = new Vector<HandlerEntry>();
128
129 Thread receiveThread;
130
131 Vector connectionMonitors = new Vector();
132 boolean monitorsWereInformed = false;
133
134 /**
135 * There were reports that there are JDKs which use
136 * the resolver even though one supplies a dotted IP
137 * address in the Socket constructor. That is why we
138 * try to generate the InetAdress "by hand".
139 *
140 * @param host
141 * @return the InetAddress
142 * @throws UnknownHostException
143 */
144 private InetAddress createInetAddress(String host) throws UnknownHostException {
145 /* Check if it is a dotted IP4 address */
146 InetAddress addr = parseIPv4Address(host);
147
148 if (addr != null)
149 return addr;
150
151 return InetAddress.getByName(host);
152 }
153
154 private InetAddress parseIPv4Address(String host) throws UnknownHostException {
155 if (host == null)
156 return null;
157
158 String[] quad = Tokenizer.parseTokens(host, '.');
159
160 if ((quad == null) || (quad.length != 4))
161 return null;
162
163 byte[] addr = new byte[4];
164
165 for (int i = 0; i < 4; i++) {
166 int part = 0;
167
168 if ((quad[i].length() == 0) || (quad[i].length() > 3))
169 return null;
170
171 for (int k = 0; k < quad[i].length(); k++) {
172 char c = quad[i].charAt(k);
173
174 /* No, Character.isDigit is not the same */
175 if ((c < '0') || (c > '9'))
176 return null;
177
178 part = part * 10 + (c - '0');
179 }
180
181 if (part > 255) /* 300.1.2.3 is invalid =) */
182 return null;
183
184 addr[i] = (byte) part;
185 }
186
187 return InetAddress.getByAddress(host, addr);
188 }
189
190 public TransportManager(String host, int port) throws IOException {
191 this.hostname = host;
192 this.port = port;
193 }
194
195 public int getPacketOverheadEstimate() {
196 return tc.getPacketOverheadEstimate();
197 }
198
199 public void setTcpNoDelay(boolean state) throws IOException {
200 sock.setTcpNoDelay(state);
201 }
202
203 public void setSoTimeout(int timeout) throws IOException {
204 sock.setSoTimeout(timeout);
205 }
206
207 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException {
208 return km.getOrWaitForConnectionInfo(kexNumber);
209 }
210
211 public Throwable getReasonClosedCause() {
212 synchronized (connectionSemaphore) {
213 return reasonClosedCause;
214 }
215 }
216
217 public byte[] getSessionIdentifier() {
218 return km.sessionId;
219 }
220
221 public void close(Throwable cause, boolean useDisconnectPacket) {
222 if (useDisconnectPacket == false) {
223 /* OK, hard shutdown - do not aquire the semaphore,
224 * perhaps somebody is inside (and waits until the remote
225 * side is ready to accept new data). */
226 try {
227 sock.close();
228 }
229 catch (IOException ignore) {
230 }
231
232 /* OK, whoever tried to send data, should now agree that
233 * there is no point in further waiting =)
234 * It is safe now to aquire the semaphore.
235 */
236 }
237
238 synchronized (connectionSemaphore) {
239 if (connectionClosed == false) {
240 if (useDisconnectPacket == true) {
241 try {
242 byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
243 .getPayload();
244
245 if (tc != null)
246 tc.sendMessage(msg);
247 }
248 catch (IOException ignore) {
249 }
250
251 try {
252 sock.close();
253 }
254 catch (IOException ignore) {
255 }
256 }
257
258 connectionClosed = true;
259 reasonClosedCause = cause; /* may be null */
260 }
261
262 connectionSemaphore.notifyAll();
263 }
264
265 /* No check if we need to inform the monitors */
266 Vector monitors = null;
267
268 synchronized (this) {
269 /* Short term lock to protect "connectionMonitors"
270 * and "monitorsWereInformed"
271 * (they may be modified concurrently)
272 */
273 if (monitorsWereInformed == false) {
274 monitorsWereInformed = true;
275 monitors = (Vector) connectionMonitors.clone();
276 }
277 }
278
279 if (monitors != null) {
280 for (int i = 0; i < monitors.size(); i++) {
281 try {
282 ConnectionMonitor cmon = (ConnectionMonitor) monitors.elementAt(i);
283 cmon.connectionLost(reasonClosedCause);
284 }
285 catch (Exception ignore) {
286 }
287 }
288 }
289 }
290
291 private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException {
292 /* See the comment for createInetAddress() */
293 if (proxyData == null) {
294 InetAddress addr = createInetAddress(hostname);
295 sock.connect(new InetSocketAddress(addr, port), connectTimeout);
296 sock.setSoTimeout(0);
297 return;
298 }
299
300 if (proxyData instanceof HTTPProxyData) {
301 HTTPProxyData pd = (HTTPProxyData) proxyData;
302 /* At the moment, we only support HTTP proxies */
303 InetAddress addr = createInetAddress(pd.proxyHost);
304 sock.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout);
305 sock.setSoTimeout(0);
306 /* OK, now tell the proxy where we actually want to connect to */
307 StringBuffer sb = new StringBuffer();
308 sb.append("CONNECT ");
309 sb.append(hostname);
310 sb.append(':');
311 sb.append(port);
312 sb.append(" HTTP/1.0\r\n");
313
314 if ((pd.proxyUser != null) && (pd.proxyPass != null)) {
315 String credentials = pd.proxyUser + ":" + pd.proxyPass;
316 char[] encoded = Base64.encode(credentials.getBytes("ISO-8859-1"));
317 sb.append("Proxy-Authorization: Basic ");
318 sb.append(encoded);
319 sb.append("\r\n");
320 }
321
322 if (pd.requestHeaderLines != null) {
323 for (int i = 0; i < pd.requestHeaderLines.length; i++) {
324 if (pd.requestHeaderLines[i] != null) {
325 sb.append(pd.requestHeaderLines[i]);
326 sb.append("\r\n");
327 }
328 }
329 }
330
331 sb.append("\r\n");
332 OutputStream out = sock.getOutputStream();
333 out.write(sb.toString().getBytes("ISO-8859-1"));
334 out.flush();
335 /* Now parse the HTTP response */
336 byte[] buffer = new byte[1024];
337 InputStream in = sock.getInputStream();
338 int len = ClientServerHello.readLineRN(in, buffer);
339 String httpReponse = new String(buffer, 0, len, "ISO-8859-1");
340
341 if (httpReponse.startsWith("HTTP/") == false)
342 throw new IOException("The proxy did not send back a valid HTTP response.");
343
344 /* "HTTP/1.X XYZ X" => 14 characters minimum */
345
346 if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
347 throw new IOException("The proxy did not send back a valid HTTP response.");
348
349 int errorCode = 0;
350
351 try {
352 errorCode = Integer.parseInt(httpReponse.substring(9, 12));
353 }
354 catch (NumberFormatException ignore) {
355 throw new IOException("The proxy did not send back a valid HTTP response.");
356 }
357
358 if ((errorCode < 0) || (errorCode > 999))
359 throw new IOException("The proxy did not send back a valid HTTP response.");
360
361 if (errorCode != 200) {
362 throw new HTTPProxyException(httpReponse.substring(13), errorCode);
363 }
364
365 /* OK, read until empty line */
366
367 while (true) {
368 len = ClientServerHello.readLineRN(in, buffer);
369
370 if (len == 0)
371 break;
372 }
373
374 return;
375 }
376
377 throw new IOException("Unsupported ProxyData");
378 }
379
380 public void initialize(CryptoWishList cwl, ServerHostKeyVerifier verifier, DHGexParameters dhgex,
381 int connectTimeout, SecureRandom rnd, ProxyData proxyData) throws IOException {
382 /* First, establish the TCP connection to the SSH-2 server */
383 establishConnection(proxyData, connectTimeout);
384 /* Parse the server line and say hello - important: this information is later needed for the
385 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
386 * for later use.
387 */
388 ClientServerHello csh = new ClientServerHello(sock.getInputStream(), sock.getOutputStream());
389 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
390 km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd);
391 km.initiateKEX(cwl, dhgex);
392 receiveThread = new Thread(new Runnable() {
393 public void run() {
394 try {
395 receiveLoop();
396 }
397 catch (IOException e) {
398 close(e, false);
399
400 if (log.isEnabled())
401 log.log(10, "Receive thread: error in receiveLoop: " + e.getMessage());
402 }
403
404 if (log.isEnabled())
405 log.log(50, "Receive thread: back from receiveLoop");
406
407 /* Tell all handlers that it is time to say goodbye */
408
409 if (km != null) {
410 try {
411 km.handleMessage(null, 0);
412 }
413 catch (IOException e) {
414 }
415 }
416
417 for (int i = 0; i < messageHandlers.size(); i++) {
418 HandlerEntry he = messageHandlers.elementAt(i);
419
420 try {
421 he.mh.handleMessage(null, 0);
422 }
423 catch (Exception ignore) {
424 }
425 }
426 }
427 });
428 receiveThread.setDaemon(true);
429 receiveThread.start();
430 }
431
432 public void registerMessageHandler(MessageHandler mh, int low, int high) {
433 HandlerEntry he = new HandlerEntry();
434 he.mh = mh;
435 he.low = low;
436 he.high = high;
437
438 synchronized (messageHandlers) {
439 messageHandlers.addElement(he);
440 }
441 }
442
443 public void removeMessageHandler(MessageHandler mh, int low, int high) {
444 synchronized (messageHandlers) {
445 for (int i = 0; i < messageHandlers.size(); i++) {
446 HandlerEntry he = messageHandlers.elementAt(i);
447
448 if ((he.mh == mh) && (he.low == low) && (he.high == high)) {
449 messageHandlers.removeElementAt(i);
450 break;
451 }
452 }
453 }
454 }
455
456 public void sendKexMessage(byte[] msg) throws IOException {
457 synchronized (connectionSemaphore) {
458 if (connectionClosed) {
459 throw(IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
460 }
461
462 flagKexOngoing = true;
463
464 try {
465 tc.sendMessage(msg);
466 }
467 catch (IOException e) {
468 close(e, false);
469 throw e;
470 }
471 }
472 }
473
474 public void kexFinished() throws IOException {
475 synchronized (connectionSemaphore) {
476 flagKexOngoing = false;
477 connectionSemaphore.notifyAll();
478 }
479 }
480
481 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException {
482 km.initiateKEX(cwl, dhgex);
483 }
484
485 public void changeRecvCipher(BlockCipher bc, MAC mac) {
486 tc.changeRecvCipher(bc, mac);
487 }
488
489 public void changeSendCipher(BlockCipher bc, MAC mac) {
490 tc.changeSendCipher(bc, mac);
491 }
492
493 /**
494 * @param comp
495 */
496 public void changeRecvCompression(ICompressor comp) {
497 tc.changeRecvCompression(comp);
498 }
499
500 /**
501 * @param comp
502 */
503 public void changeSendCompression(ICompressor comp) {
504 tc.changeSendCompression(comp);
505 }
506
507 /**
508 *
509 */
510 public void startCompression() {
511 tc.startCompression();
512 }
513
514 public void sendAsynchronousMessage(byte[] msg) throws IOException {
515 synchronized (asynchronousQueue) {
516 asynchronousQueue.addElement(msg);
517
518 /* This limit should be flexible enough. We need this, otherwise the peer
519 * can flood us with global requests (and other stuff where we have to reply
520 * with an asynchronous message) and (if the server just sends data and does not
521 * read what we send) this will probably put us in a low memory situation
522 * (our send queue would grow and grow and...) */
523
524 if (asynchronousQueue.size() > 100)
525 throw new IOException("Error: the peer is not consuming our asynchronous replies.");
526
527 /* Check if we have an asynchronous sending thread */
528
529 if (asynchronousThread == null) {
530 asynchronousThread = new AsynchronousWorker();
531 asynchronousThread.setDaemon(true);
532 asynchronousThread.start();
533 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
534 }
535 }
536 }
537
538 public void setConnectionMonitors(Vector monitors) {
539 synchronized (this) {
540 connectionMonitors = (Vector) monitors.clone();
541 }
542 }
543
544 public void sendMessage(byte[] msg) throws IOException {
545 if (Thread.currentThread() == receiveThread)
546 throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!");
547
548 synchronized (connectionSemaphore) {
549 while (true) {
550 if (connectionClosed) {
551 throw(IOException) new IOException("Sorry, this connection is closed.")
552 .initCause(reasonClosedCause);
553 }
554
555 if (flagKexOngoing == false)
556 break;
557
558 try {
559 connectionSemaphore.wait();
560 }
561 catch (InterruptedException e) {
562 }
563 }
564
565 try {
566 tc.sendMessage(msg);
567 }
568 catch (IOException e) {
569 close(e, false);
570 throw e;
571 }
572 }
573 }
574
575 public void receiveLoop() throws IOException {
576 byte[] msg = new byte[35000];
577
578 while (true) {
579 int msglen = tc.receiveMessage(msg, 0, msg.length);
580 int type = msg[0] & 0xff;
581
582 if (type == Packets.SSH_MSG_IGNORE)
583 continue;
584
585 if (type == Packets.SSH_MSG_DEBUG) {
586 if (log.isEnabled()) {
587 TypesReader tr = new TypesReader(msg, 0, msglen);
588 tr.readByte();
589 tr.readBoolean();
590 StringBuffer debugMessageBuffer = new StringBuffer();
591 debugMessageBuffer.append(tr.readString("UTF-8"));
592
593 for (int i = 0; i < debugMessageBuffer.length(); i++) {
594 char c = debugMessageBuffer.charAt(i);
595
596 if ((c >= 32) && (c <= 126))
597 continue;
598
599 debugMessageBuffer.setCharAt(i, '\uFFFD');
600 }
601
602 log.log(50, "DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
603 }
604
605 continue;
606 }
607
608 if (type == Packets.SSH_MSG_UNIMPLEMENTED) {
609 throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen.");
610 }
611
612 if (type == Packets.SSH_MSG_DISCONNECT) {
613 TypesReader tr = new TypesReader(msg, 0, msglen);
614 tr.readByte();
615 int reason_code = tr.readUINT32();
616 StringBuffer reasonBuffer = new StringBuffer();
617 reasonBuffer.append(tr.readString("UTF-8"));
618
619 /*
620 * Do not get fooled by servers that send abnormal long error
621 * messages
622 */
623
624 if (reasonBuffer.length() > 255) {
625 reasonBuffer.setLength(255);
626 reasonBuffer.setCharAt(254, '.');
627 reasonBuffer.setCharAt(253, '.');
628 reasonBuffer.setCharAt(252, '.');
629 }
630
631 /*
632 * Also, check that the server did not send charcaters that may
633 * screw up the receiver -> restrict to reasonable US-ASCII
634 * subset -> "printable characters" (ASCII 32 - 126). Replace
635 * all others with 0xFFFD (UNICODE replacement character).
636 */
637
638 for (int i = 0; i < reasonBuffer.length(); i++) {
639 char c = reasonBuffer.charAt(i);
640
641 if ((c >= 32) && (c <= 126))
642 continue;
643
644 reasonBuffer.setCharAt(i, '\uFFFD');
645 }
646
647 throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): "
648 + reasonBuffer.toString());
649 }
650
651 /*
652 * Is it a KEX Packet?
653 */
654
655 if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
656 || ((type >= 30) && (type <= 49))) {
657 km.handleMessage(msg, msglen);
658 continue;
659 }
660
661 if (type == Packets.SSH_MSG_USERAUTH_SUCCESS) {
662 tc.startCompression();
663 }
664
665 MessageHandler mh = null;
666
667 for (int i = 0; i < messageHandlers.size(); i++) {
668 HandlerEntry he = messageHandlers.elementAt(i);
669
670 if ((he.low <= type) && (type <= he.high)) {
671 mh = he.mh;
672 break;
673 }
674 }
675
676 if (mh == null)
677 throw new IOException("Unexpected SSH message (type " + type + ")");
678
679 mh.handleMessage(msg, msglen);
680 }
681 }
682 }