comparison src/ch/ethz/ssh2/transport/TransportManager.java @ 307:071eccdff8ea ganymed

fix java formatting
author Carl Byington <carl@five-ten-sg.com>
date Wed, 30 Jul 2014 14:16:58 -0700
parents d2b303406d63
children 776a220dbcc6
comparison
equal deleted inserted replaced
305:d2b303406d63 307:071eccdff8ea
53 * Advertised maximum SSH packet size that the other side can send to us. 53 * Advertised maximum SSH packet size that the other side can send to us.
54 */ 54 */
55 public static final int MAX_PACKET_SIZE = 64 * 1024; 55 public static final int MAX_PACKET_SIZE = 64 * 1024;
56 56
57 private final List<AsynchronousEntry> asynchronousQueue 57 private final List<AsynchronousEntry> asynchronousQueue
58 = new ArrayList<AsynchronousEntry>(); 58 = new ArrayList<AsynchronousEntry>();
59 59
60 private Thread asynchronousThread = null; 60 private Thread asynchronousThread = null;
61 private boolean asynchronousPending = false; 61 private boolean asynchronousPending = false;
62 62
63 private Socket socket; 63 private Socket socket;
74 } 74 }
75 } 75 }
76 76
77 private final class AsynchronousWorker implements Runnable { 77 private final class AsynchronousWorker implements Runnable {
78 public void run() { 78 public void run() {
79 while(true) { 79 while (true) {
80 final AsynchronousEntry item; 80 final AsynchronousEntry item;
81 synchronized(asynchronousQueue) { 81
82 if(asynchronousQueue.size() == 0) { 82 synchronized (asynchronousQueue) {
83 if (asynchronousQueue.size() == 0) {
83 // Only now we may reset the flag, since we are sure that all queued items 84 // Only now we may reset the flag, since we are sure that all queued items
84 // have been sent (there is a slight delay between de-queuing and sending, 85 // have been sent (there is a slight delay between de-queuing and sending,
85 // this is why we need this flag! See code below. Sending takes place outside 86 // this is why we need this flag! See code below. Sending takes place outside
86 // of this lock, this is why a test for size()==0 (from another thread) does not ensure 87 // of this lock, this is why a test for size()==0 (from another thread) does not ensure
87 // that all messages have been sent. 88 // that all messages have been sent.
88
89 asynchronousPending = false; 89 asynchronousPending = false;
90
91 // Notify any senders that they can proceed, all async messages have been delivered 90 // Notify any senders that they can proceed, all async messages have been delivered
92
93 asynchronousQueue.notifyAll(); 91 asynchronousQueue.notifyAll();
94 92
95 // After the queue is empty for about 2 seconds, stop this thread 93 // After the queue is empty for about 2 seconds, stop this thread
96 try { 94 try {
97 asynchronousQueue.wait(2000); 95 asynchronousQueue.wait(2000);
98 } 96 }
99 catch(InterruptedException ignore) { 97 catch (InterruptedException ignore) {
100 // 98 //
101 } 99 }
102 if(asynchronousQueue.size() == 0) { 100
101 if (asynchronousQueue.size() == 0) {
103 asynchronousThread = null; 102 asynchronousThread = null;
104 return; 103 return;
105 } 104 }
106 } 105 }
106
107 item = asynchronousQueue.remove(0); 107 item = asynchronousQueue.remove(0);
108 } 108 }
109
109 try { 110 try {
110 sendMessageImmediate(item.message); 111 sendMessageImmediate(item.message);
111 } 112 }
112 catch(IOException e) { 113 catch (IOException e) {
113 // There is no point in handling it - it simply means that the connection has a problem and we should stop 114 // There is no point in handling it - it simply means that the connection has a problem and we should stop
114 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null): 115 // sending asynchronously messages. We do not need to signal that we have exited (asynchronousThread = null):
115 // further messages in the queue cannot be sent by this or any other thread. 116 // further messages in the queue cannot be sent by this or any other thread.
116 // Other threads will sooner or later (when receiving or sending the next message) get the 117 // Other threads will sooner or later (when receiving or sending the next message) get the
117 // same IOException and get to the same conclusion. 118 // same IOException and get to the same conclusion.
131 132
132 private TransportConnection tc; 133 private TransportConnection tc;
133 private KexManager km; 134 private KexManager km;
134 135
135 private final List<HandlerEntry> messageHandlers 136 private final List<HandlerEntry> messageHandlers
136 = new ArrayList<HandlerEntry>(); 137 = new ArrayList<HandlerEntry>();
137 138
138 private List<ConnectionMonitor> connectionMonitors 139 private List<ConnectionMonitor> connectionMonitors
139 = new ArrayList<ConnectionMonitor>(); 140 = new ArrayList<ConnectionMonitor>();
140 141
141 protected void init(TransportConnection tc, KexManager km) { 142 protected void init(TransportConnection tc, KexManager km) {
142 this.tc = tc; 143 this.tc = tc;
143 this.km = km; 144 this.km = km;
144 } 145 }
150 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException { 151 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException {
151 return km.getOrWaitForConnectionInfo(kexNumber); 152 return km.getOrWaitForConnectionInfo(kexNumber);
152 } 153 }
153 154
154 public IOException getReasonClosedCause() { 155 public IOException getReasonClosedCause() {
155 synchronized(connectionSemaphore) { 156 synchronized (connectionSemaphore) {
156 return reasonClosedCause; 157 return reasonClosedCause;
157 } 158 }
158 } 159 }
159 160
160 public byte[] getSessionIdentifier() { 161 public byte[] getSessionIdentifier() {
161 return km.sessionId; 162 return km.sessionId;
162 } 163 }
163 164
164 public void close() { 165 public void close() {
165 // It is safe now to acquire the semaphore. 166 // It is safe now to acquire the semaphore.
166 synchronized(connectionSemaphore) { 167 synchronized (connectionSemaphore) {
167 if(!connectionClosed) { 168 if (!connectionClosed) {
168 try { 169 try {
169 tc.sendMessage(new PacketDisconnect( 170 tc.sendMessage(new PacketDisconnect(
170 PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload()); 171 PacketDisconnect.Reason.SSH_DISCONNECT_BY_APPLICATION, "").getPayload());
171 } 172 }
172 catch(IOException ignore) { 173 catch (IOException ignore) {
173 // 174 //
174 } 175 }
176
175 try { 177 try {
176 socket.close(); 178 socket.close();
177 } 179 }
178 catch(IOException ignore) { 180 catch (IOException ignore) {
179 // 181 //
180 } 182 }
183
181 connectionClosed = true; 184 connectionClosed = true;
182 synchronized(this) { 185
183 for(ConnectionMonitor cmon : connectionMonitors) { 186 synchronized (this) {
187 for (ConnectionMonitor cmon : connectionMonitors) {
184 cmon.connectionLost(reasonClosedCause); 188 cmon.connectionLost(reasonClosedCause);
185 } 189 }
186 } 190 }
187 } 191 }
192
188 connectionSemaphore.notifyAll(); 193 connectionSemaphore.notifyAll();
189 } 194 }
190 } 195 }
191 196
192 public void close(IOException cause) { 197 public void close(IOException cause) {
193 // Do not acquire the semaphore, perhaps somebody is inside (and waits until 198 // Do not acquire the semaphore, perhaps somebody is inside (and waits until
194 // the remote side is ready to accept new data 199 // the remote side is ready to accept new data
195 try { 200 try {
196 socket.close(); 201 socket.close();
197 } 202 }
198 catch(IOException ignore) { 203 catch (IOException ignore) {
199 } 204 }
205
200 // It is safe now to acquire the semaphore. 206 // It is safe now to acquire the semaphore.
201 synchronized(connectionSemaphore) { 207 synchronized (connectionSemaphore) {
202 connectionClosed = true; 208 connectionClosed = true;
203 reasonClosedCause = cause; 209 reasonClosedCause = cause;
204 connectionSemaphore.notifyAll(); 210 connectionSemaphore.notifyAll();
205 } 211 }
206 synchronized(this) { 212
207 for(ConnectionMonitor cmon : connectionMonitors) { 213 synchronized (this) {
214 for (ConnectionMonitor cmon : connectionMonitors) {
208 cmon.connectionLost(reasonClosedCause); 215 cmon.connectionLost(reasonClosedCause);
209 } 216 }
210 } 217 }
211 } 218 }
212 219
215 public void run() { 222 public void run() {
216 try { 223 try {
217 receiveLoop(); 224 receiveLoop();
218 // Can only exit with exception 225 // Can only exit with exception
219 } 226 }
220 catch(IOException e) { 227 catch (IOException e) {
221 close(e); 228 close(e);
222 log.warning(e.getMessage()); 229 log.warning(e.getMessage());
230
223 // Tell all handlers that it is time to say goodbye 231 // Tell all handlers that it is time to say goodbye
224 if(km != null) { 232 if (km != null) {
225 km.handleFailure(e); 233 km.handleFailure(e);
226 } 234 }
227 for(HandlerEntry he : messageHandlers) { 235
236 for (HandlerEntry he : messageHandlers) {
228 he.mh.handleFailure(e); 237 he.mh.handleFailure(e);
229 } 238 }
230 } 239 }
231 if(log.isDebugEnabled()) { 240
241 if (log.isDebugEnabled()) {
232 log.debug("Receive thread: back from receiveLoop"); 242 log.debug("Receive thread: back from receiveLoop");
233 } 243 }
234 } 244 }
235 }); 245 });
236 receiveThread.setName("Transport Manager"); 246 receiveThread.setName("Transport Manager");
242 HandlerEntry he = new HandlerEntry(); 252 HandlerEntry he = new HandlerEntry();
243 he.mh = mh; 253 he.mh = mh;
244 he.low = low; 254 he.low = low;
245 he.high = high; 255 he.high = high;
246 256
247 synchronized(messageHandlers) { 257 synchronized (messageHandlers) {
248 messageHandlers.add(he); 258 messageHandlers.add(he);
249 } 259 }
250 } 260 }
251 261
252 public void removeMessageHandler(MessageHandler handler) { 262 public void removeMessageHandler(MessageHandler handler) {
253 synchronized(messageHandlers) { 263 synchronized (messageHandlers) {
254 for(int i = 0; i < messageHandlers.size(); i++) { 264 for (int i = 0; i < messageHandlers.size(); i++) {
255 HandlerEntry he = messageHandlers.get(i); 265 HandlerEntry he = messageHandlers.get(i);
256 if(he.mh == handler) { 266
267 if (he.mh == handler) {
257 messageHandlers.remove(i); 268 messageHandlers.remove(i);
258 break; 269 break;
259 } 270 }
260 } 271 }
261 } 272 }
262 } 273 }
263 274
264 public void sendKexMessage(byte[] msg) throws IOException { 275 public void sendKexMessage(byte[] msg) throws IOException {
265 synchronized(connectionSemaphore) { 276 synchronized (connectionSemaphore) {
266 if(connectionClosed) { 277 if (connectionClosed) {
267 throw reasonClosedCause; 278 throw reasonClosedCause;
268 } 279 }
280
269 flagKexOngoing = true; 281 flagKexOngoing = true;
282
270 try { 283 try {
271 tc.sendMessage(msg); 284 tc.sendMessage(msg);
272 } 285 }
273 catch(IOException e) { 286 catch (IOException e) {
274 close(e); 287 close(e);
275 throw e; 288 throw e;
276 } 289 }
277 } 290 }
278 } 291 }
279 292
280 public void kexFinished() throws IOException { 293 public void kexFinished() throws IOException {
281 synchronized(connectionSemaphore) { 294 synchronized (connectionSemaphore) {
282 flagKexOngoing = false; 295 flagKexOngoing = false;
283 connectionSemaphore.notifyAll(); 296 connectionSemaphore.notifyAll();
284 } 297 }
285 } 298 }
286 299
290 * @param dsa may be null if this is a client connection 303 * @param dsa may be null if this is a client connection
291 * @param rsa may be null if this is a client connection 304 * @param rsa may be null if this is a client connection
292 * @throws IOException 305 * @throws IOException
293 */ 306 */
294 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec) 307 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, KeyPair dsa, KeyPair rsa, KeyPair ec)
295 throws IOException { 308 throws IOException {
296 synchronized(connectionSemaphore) { 309 synchronized (connectionSemaphore) {
297 if(connectionClosed) { 310 if (connectionClosed) {
298 // Inform the caller that there is no point in triggering a new kex 311 // Inform the caller that there is no point in triggering a new kex
299 throw reasonClosedCause; 312 throw reasonClosedCause;
300 } 313 }
301 } 314 }
315
302 km.initiateKEX(cwl, dhgex, dsa, rsa, ec); 316 km.initiateKEX(cwl, dhgex, dsa, rsa, ec);
303 } 317 }
304 318
305 public void changeRecvCipher(BlockCipher bc, MAC mac) { 319 public void changeRecvCipher(BlockCipher bc, MAC mac) {
306 tc.changeRecvCipher(bc, mac); 320 tc.changeRecvCipher(bc, mac);
317 public void changeSendCompression(Compressor comp) { 331 public void changeSendCompression(Compressor comp) {
318 tc.changeSendCompression(comp); 332 tc.changeSendCompression(comp);
319 } 333 }
320 334
321 public void sendAsynchronousMessage(byte[] msg) throws IOException { 335 public void sendAsynchronousMessage(byte[] msg) throws IOException {
322 synchronized(asynchronousQueue) { 336 synchronized (asynchronousQueue) {
323 asynchronousQueue.add(new AsynchronousEntry(msg)); 337 asynchronousQueue.add(new AsynchronousEntry(msg));
324 asynchronousPending = true; 338 asynchronousPending = true;
325 339
326 /* This limit should be flexible enough. We need this, otherwise the peer 340 /* This limit should be flexible enough. We need this, otherwise the peer
327 * can flood us with global requests (and other stuff where we have to reply 341 * can flood us with global requests (and other stuff where we have to reply
328 * with an asynchronous message) and (if the server just sends data and does not 342 * with an asynchronous message) and (if the server just sends data and does not
329 * read what we send) this will probably put us in a low memory situation 343 * read what we send) this will probably put us in a low memory situation
330 * (our send queue would grow and grow and...) */ 344 * (our send queue would grow and grow and...) */
331 345
332 if(asynchronousQueue.size() > 100) { 346 if (asynchronousQueue.size() > 100) {
333 throw new IOException("The peer is not consuming our asynchronous replies."); 347 throw new IOException("The peer is not consuming our asynchronous replies.");
334 } 348 }
335 349
336 // Check if we have an asynchronous sending thread 350 // Check if we have an asynchronous sending thread
337 if(asynchronousThread == null) { 351 if (asynchronousThread == null) {
338 asynchronousThread = new Thread(new AsynchronousWorker()); 352 asynchronousThread = new Thread(new AsynchronousWorker());
339 asynchronousThread.setDaemon(true); 353 asynchronousThread.setDaemon(true);
340 asynchronousThread.start(); 354 asynchronousThread.start();
341 // The thread will stop after 2 seconds of inactivity (i.e., empty queue) 355 // The thread will stop after 2 seconds of inactivity (i.e., empty queue)
342 } 356 }
357
343 asynchronousQueue.notifyAll(); 358 asynchronousQueue.notifyAll();
344 } 359 }
345 } 360 }
346 361
347 public void setConnectionMonitors(List<ConnectionMonitor> monitors) { 362 public void setConnectionMonitors(List<ConnectionMonitor> monitors) {
348 synchronized(this) { 363 synchronized (this) {
349 connectionMonitors = new ArrayList<ConnectionMonitor>(); 364 connectionMonitors = new ArrayList<ConnectionMonitor>();
350 connectionMonitors.addAll(monitors); 365 connectionMonitors.addAll(monitors);
351 } 366 }
352 } 367 }
353 368
356 * 371 *
357 * @param msg Message 372 * @param msg Message
358 * @throws IOException 373 * @throws IOException
359 */ 374 */
360 public void sendMessage(byte[] msg) throws IOException { 375 public void sendMessage(byte[] msg) throws IOException {
361 synchronized(asynchronousQueue) { 376 synchronized (asynchronousQueue) {
362 while(asynchronousPending) { 377 while (asynchronousPending) {
363 try { 378 try {
364 asynchronousQueue.wait(); 379 asynchronousQueue.wait();
365 } 380 }
366 catch(InterruptedException e) { 381 catch (InterruptedException e) {
367 throw new InterruptedIOException(e.getMessage()); 382 throw new InterruptedIOException(e.getMessage());
368 } 383 }
369 } 384 }
370 } 385 }
386
371 sendMessageImmediate(msg); 387 sendMessageImmediate(msg);
372 } 388 }
373 389
374 /** 390 /**
375 * Send message, ignore queued async messages that have not been delivered yet. 391 * Send message, ignore queued async messages that have not been delivered yet.
377 * 393 *
378 * @param msg Message 394 * @param msg Message
379 * @throws IOException 395 * @throws IOException
380 */ 396 */
381 public void sendMessageImmediate(byte[] msg) throws IOException { 397 public void sendMessageImmediate(byte[] msg) throws IOException {
382 synchronized(connectionSemaphore) { 398 synchronized (connectionSemaphore) {
383 while(true) { 399 while (true) {
384 if(connectionClosed) { 400 if (connectionClosed) {
385 throw reasonClosedCause; 401 throw reasonClosedCause;
386 } 402 }
387 if(!flagKexOngoing) { 403
404 if (!flagKexOngoing) {
388 break; 405 break;
389 } 406 }
407
390 try { 408 try {
391 connectionSemaphore.wait(); 409 connectionSemaphore.wait();
392 } 410 }
393 catch(InterruptedException e) { 411 catch (InterruptedException e) {
394 throw new InterruptedIOException(e.getMessage()); 412 throw new InterruptedIOException(e.getMessage());
395 } 413 }
396 } 414 }
397 415
398 try { 416 try {
399 tc.sendMessage(msg); 417 tc.sendMessage(msg);
400 } 418 }
401 catch(IOException e) { 419 catch (IOException e) {
402 close(e); 420 close(e);
403 throw e; 421 throw e;
404 } 422 }
405 } 423 }
406 } 424 }
407 425
408 private void receiveLoop() throws IOException { 426 private void receiveLoop() throws IOException {
409 while(true) { 427 while (true) {
410 final byte[] buffer = new byte[MAX_PACKET_SIZE]; 428 final byte[] buffer = new byte[MAX_PACKET_SIZE];
411 final int length = tc.receiveMessage(buffer, 0, buffer.length); 429 final int length = tc.receiveMessage(buffer, 0, buffer.length);
412 final byte[] packet = new byte[length]; 430 final byte[] packet = new byte[length];
413 System.arraycopy(buffer, 0, packet, 0, length); 431 System.arraycopy(buffer, 0, packet, 0, length);
414 final int type = packet[0] & 0xff; 432 final int type = packet[0] & 0xff;
415 switch(type) { 433
434 switch (type) {
416 case Packets.SSH_MSG_IGNORE: 435 case Packets.SSH_MSG_IGNORE:
417 break; 436 break;
437
418 case Packets.SSH_MSG_DEBUG: { 438 case Packets.SSH_MSG_DEBUG: {
419 TypesReader tr = new TypesReader(packet); 439 TypesReader tr = new TypesReader(packet);
420 tr.readByte(); 440 tr.readByte();
421 // always_display 441 // always_display
422 tr.readBoolean(); 442 tr.readBoolean();
423 String message = tr.readString(); 443 String message = tr.readString();
424 if(log.isDebugEnabled()) { 444
425 log.debug(String.format("Debug message from remote: '%s'", message)); 445 if (log.isDebugEnabled()) {
426 } 446 log.debug(String.format("Debug message from remote: '%s'", message));
427 break; 447 }
428 } 448
449 break;
450 }
451
429 case Packets.SSH_MSG_UNIMPLEMENTED: 452 case Packets.SSH_MSG_UNIMPLEMENTED:
430 throw new PacketTypeException(type); 453 throw new PacketTypeException(type);
454
431 case Packets.SSH_MSG_DISCONNECT: { 455 case Packets.SSH_MSG_DISCONNECT: {
432 final PacketDisconnect disconnect = new PacketDisconnect(packet); 456 final PacketDisconnect disconnect = new PacketDisconnect(packet);
433 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage()); 457 throw new DisconnectException(disconnect.getReason(), disconnect.getMessage());
434 } 458 }
459
435 case Packets.SSH_MSG_KEXINIT: 460 case Packets.SSH_MSG_KEXINIT:
436 case Packets.SSH_MSG_NEWKEYS: 461 case Packets.SSH_MSG_NEWKEYS:
437 case Packets.SSH_MSG_KEXDH_INIT: 462 case Packets.SSH_MSG_KEXDH_INIT:
438 case Packets.SSH_MSG_KEXDH_REPLY: 463 case Packets.SSH_MSG_KEXDH_REPLY:
439 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST: 464 case Packets.SSH_MSG_KEX_DH_GEX_REQUEST:
440 case Packets.SSH_MSG_KEX_DH_GEX_INIT: 465 case Packets.SSH_MSG_KEX_DH_GEX_INIT:
441 case Packets.SSH_MSG_KEX_DH_GEX_REPLY: 466 case Packets.SSH_MSG_KEX_DH_GEX_REPLY:
442 // Is it a KEX Packet 467 // Is it a KEX Packet
443 km.handleMessage(packet); 468 km.handleMessage(packet);
444 break; 469 break;
470
445 case Packets.SSH_MSG_USERAUTH_SUCCESS: 471 case Packets.SSH_MSG_USERAUTH_SUCCESS:
446 tc.startCompression(); 472 tc.startCompression();
447 // Continue with message handlers 473
474 // Continue with message handlers
448 default: 475 default:
449 boolean handled = false; 476 boolean handled = false;
450 for(HandlerEntry handler : messageHandlers) { 477
451 if((handler.low <= type) && (type <= handler.high)) { 478 for (HandlerEntry handler : messageHandlers) {
479 if ((handler.low <= type) && (type <= handler.high)) {
452 handler.mh.handleMessage(packet); 480 handler.mh.handleMessage(packet);
453 handled = true; 481 handled = true;
454 break; 482 break;
455 } 483 }
456 } 484 }
457 if(!handled) { 485
486 if (!handled) {
458 throw new PacketTypeException(type); 487 throw new PacketTypeException(type);
459 } 488 }
489
460 break; 490 break;
461 } 491 }
462 if(log.isDebugEnabled()) { 492
493 if (log.isDebugEnabled()) {
463 log.debug(String.format("Handled packet %d", type)); 494 log.debug(String.format("Handled packet %d", type));
464 } 495 }
465 } 496 }
466 } 497 }
467 } 498 }