comparison src/com/five_ten_sg/connectbot/monitor/MonitorService.java @ 23:2586a4f5c8c3

add mechanism to allow getfield from other threads
author Carl Byington <carl@five-ten-sg.com>
date Fri, 01 May 2015 11:52:31 -0700
parents 943df7400741
children 4f1cc4f44c41
comparison
equal deleted inserted replaced
22:adc776858a2d 23:2586a4f5c8c3
44 public static final char MONITOR_CMD_DEPRESS = 8; 44 public static final char MONITOR_CMD_DEPRESS = 8;
45 public static final char MONITOR_CMD_SHOWURL = 9; 45 public static final char MONITOR_CMD_SHOWURL = 9;
46 public static final char MONITOR_CMD_SWITCHSESSION = 10; 46 public static final char MONITOR_CMD_SWITCHSESSION = 10;
47 public static final char MONITOR_CMD_CURSORREQUEST = 11; 47 public static final char MONITOR_CMD_CURSORREQUEST = 11;
48 48
49 public static final char CURSOR_REQUESTED = 0;
50 public static final char CURSOR_SCREEN_CHANGE = 1;
51 public static final char CURSOR_USER_KEY = 2;
52
49 public static final int MONITORPORT = 6000; 53 public static final int MONITORPORT = 6000;
50 public static ConcurrentHashMap<Integer,CommunicationThread> clients = new ConcurrentHashMap<Integer,CommunicationThread>(); 54 public static ConcurrentHashMap<Integer,CommunicationThread> clients = new ConcurrentHashMap<Integer,CommunicationThread>();
51 public static int currentConnection = -1; 55 public static int currentConnection = -1;
52 56
53 private boolean speech = false; 57 private boolean speech = false;
98 102
99 public void printer(String msg) { 103 public void printer(String msg) {
100 if (handler != null) handler.sendMessage(handler.obtainMessage(MonitorActivity.MESSAGE_CODE_PRINT, msg)); 104 if (handler != null) handler.sendMessage(handler.obtainMessage(MonitorActivity.MESSAGE_CODE_PRINT, msg));
101 } 105 }
102 106
107 public synchronized void setCurrentConnection(int connection) {
108 currentConnection = connection;
109 }
110
111 public synchronized int nextConnection() {
112 int c = 1;
113 while (clients.get(c) != null) c++;
114 return c;
115 }
116
103 @Override 117 @Override
104 public int onStartCommand(Intent intent, int flags, int startId) { 118 public int onStartCommand(Intent intent, int flags, int startId) {
105 Log.i(TAG, "service onStartCommand()"); 119 Log.i(TAG, "service onStartCommand()");
106 return START_STICKY; 120 return START_STICKY;
107 } 121 }
108 122
109 @Override 123 @Override
110 public void onDestroy() { 124 public void onDestroy() {
111 try { 125 try {
112 Log.i(TAG, "service onDestroy()"); 126 Log.i(TAG, "service onDestroy()");
127 teCloseAll();
113 talker.stop(); 128 talker.stop();
114 talker.shutdown(); 129 talker.shutdown();
115 wifiLock.release(); 130 wifiLock.release();
116 wakeLock.release(); 131 wakeLock.release();
117 serverSocket.close(); 132 serverSocket.close();
132 return; 147 return;
133 } 148 }
134 while (true) { 149 while (true) {
135 try{ 150 try{
136 socket = serverSocket.accept(); 151 socket = serverSocket.accept();
137 connection = connection + 1; 152 connection = nextConnection();
138 CommunicationThread commThread = new CommunicationThread(connection, socket); 153 CommunicationThread commThread = new CommunicationThread(connection, socket);
139 clients.put(connection, commThread); 154 clients.put(connection, commThread);
140 commThread.start(); 155 commThread.start();
141 } catch (IOException e) { 156 } catch (IOException e) {
142 Log.e(TAG, "exception in ServerThread.run(), listening socket closed", e); 157 Log.e(TAG, "exception in ServerThread.run(), listening socket closed", e);
155 this.b = b; 170 this.b = b;
156 } 171 }
157 } 172 }
158 173
159 class CommunicationThread extends Thread { 174 class CommunicationThread extends Thread {
175 public int thread_id;
160 public int connection; 176 public int connection;
177 private String initString = null;
161 private Socket client_socket; 178 private Socket client_socket;
162 private InputStream client_in; 179 private InputStream client_in;
163 private OutputStream client_out; 180 private OutputStream client_out;
164 private boolean is_closing = false; 181 private boolean is_closing = false;
165 private BlockingQueue<triple> queue = new ArrayBlockingQueue<triple>(1); 182 private Integer getfields_outstanding = 0;
183 private BlockingQueue<triple> value_queue = new ArrayBlockingQueue<triple>(1);
184 private BlockingQueue<char[]> packet_queue = new ArrayBlockingQueue<char[]>(10000);
166 185
167 public CommunicationThread(int handle, Socket socket) { 186 public CommunicationThread(int handle, Socket socket) {
168 connection = handle; 187 connection = handle;
169 client_socket = socket; 188 client_socket = socket;
170 try { 189 try {
171 client_in = client_socket.getInputStream(); 190 client_in = client_socket.getInputStream();
172 client_out = client_socket.getOutputStream(); 191 client_out = client_socket.getOutputStream();
173 } catch (IOException e) { 192 } catch (IOException e) {
174 Log.e(TAG, "exception in CommunicationThread() constructor, cannot get socket streams", e); 193 Log.e(TAG, "exception in CommunicationThread() constructor, cannot get socket streams", e);
175 } 194 }
195 }
196
197 public synchronized void abandon() {
198 value_queue.offer(new triple(0, 0, new char[0]));
176 } 199 }
177 200
178 public void speak(byte [] msg, boolean flush, boolean synchronous) { 201 public void speak(byte [] msg, boolean flush, boolean synchronous) {
179 if (speech) { 202 if (speech) {
180 String smsg = bytesToString(msg); 203 String smsg = bytesToString(msg);
224 b[bp++] = (byte) (c[i] & 0x00ff); 247 b[bp++] = (byte) (c[i] & 0x00ff);
225 } 248 }
226 return b; 249 return b;
227 } 250 }
228 251
252 void cleanup(char[] buf) {
253 int i;
254 for (i=0; i<buf.length; i++) {
255 if ((int)(buf[i]) < 32) buf[i] = ' ';
256 }
257 }
258
259 public int cmGetField(char[] c) {
260 int request;
261 synchronized(getfields_outstanding) {
262 request = getfields_outstanding;
263 getfields_outstanding = getfields_outstanding + 1;
264 value_queue.clear(); // we never have more than one outstanding getfield request from the reco thread on the connection
265 clientWrite(MONITOR_CMD_GETFIELD, c);
266 }
267 return request;
268 }
269
229 public synchronized void clientWrite(char cmd, char[] c) { 270 public synchronized void clientWrite(char cmd, char[] c) {
230 try { 271 try {
231 if (client_out != null) { 272 if (client_out != null) {
232 c[0] = (char)(c.length - 1); // number of chars following 273 c[0] = (char)(c.length - 1); // number of chars following
233 c[1] = cmd; 274 c[1] = cmd;
234 Log.i(TAG, String.format("sending %d command", (int)cmd)); 275 Log.i(TAG, String.format("sending %d command", (int)cmd));
235 client_out.write(charsToBytes(c)); 276 client_out.write(charsToBytes(c));
277 client_out.flush();
236 } 278 }
237 } 279 }
238 catch (IOException e) { 280 catch (IOException e) {
239 Log.e(TAG, "exception in monitorWrite()", e); 281 Log.e(TAG, "exception in monitorWrite()", e);
240 try { 282 try {
260 off += l; 302 off += l;
261 } 303 }
262 return bytesToChars(b, len2); 304 return bytesToChars(b, len2);
263 } 305 }
264 306
307 public char[] readPacket() throws IOException {
308 char[] len = forceRead(1);
309 return forceRead(len[0]);
310 }
311
312 public char[] nextPacket() throws IOException, InterruptedException {
313 char[] packet = packet_queue.poll();
314 if (packet == null) {
315 packet = readPacket();
316 if (packet[0] == MONITOR_CMD_FIELDVALUE) {
317 synchronized(getfields_outstanding) {
318 getfields_outstanding = getfields_outstanding - 1;
319 }
320 }
321 }
322 return packet;
323 }
324
325 private triple reformatValue(char[] packet) {
326 int plen = packet.length;
327 char[] buf = new char[plen-3];
328 System.arraycopy(packet, 3, buf, 0, plen-3);
329 cleanup(buf);
330 Log.i(TAG, String.format("teFieldValue %d line %d column %d b.len %d", connection, (int)packet[1], (int)packet[2], buf.length));
331 return new triple(packet[1], packet[2], buf);
332 }
333
334 public triple peekValue(int request) {
335 try {
336 while (true) {
337 char[] packet = readPacket();
338 if (packet[0] == MONITOR_CMD_FIELDVALUE) {
339 synchronized(getfields_outstanding) {
340 getfields_outstanding = getfields_outstanding - 1;
341 if (request == 0) {
342 return reformatValue(packet);
343 }
344 else {
345 packet_queue.put(packet);
346 request = request - 1;
347 }
348 }
349 }
350 else {
351 packet_queue.put(packet);
352 }
353 }
354 } catch (IOException e) {
355 return new triple(0, 0, new char[0]);
356 } catch (InterruptedException e) {
357 return new triple(0, 0, new char[0]);
358 }
359 }
360
265 public void run() { 361 public void run() {
362 thread_id = android.os.Process.myTid();
266 Log.i(TAG, String.format("CommunicationThread.run() client %d connected", connection)); 363 Log.i(TAG, String.format("CommunicationThread.run() client %d connected", connection));
267 while (true) { 364 while (true) {
268 try { 365 try {
269 char[] len = forceRead(1); 366 char[] packet = nextPacket();
270 char[] packet = forceRead(len[0]);
271 char[] buf; 367 char[] buf;
272 char cmd = packet[0]; 368 char cmd = packet[0];
273 int plen = packet.length; 369 int plen = packet.length;
274 //Log.i(TAG, String.format("received %d command length %d", (int)cmd, plen)); 370 //Log.i(TAG, String.format("received %d command length %d", (int)cmd, plen));
275 switch (cmd) { 371 switch (cmd) {
276 case MONITOR_CMD_INIT: 372 case MONITOR_CMD_INIT:
277 buf = new char[plen-1]; 373 buf = new char[plen-1];
278 System.arraycopy(packet, 1, buf, 0, plen-1); 374 System.arraycopy(packet, 1, buf, 0, plen-1);
279 abandonGetField(connection); 375 abandonGetField(connection);
280 teInit(connection, buf); 376 initString = new String(buf);
377 teInit(connection, initString);
281 break; 378 break;
282 case MONITOR_CMD_ACTIVATE: 379 case MONITOR_CMD_ACTIVATE:
283 abandonGetField(connection); 380 abandonGetField(connection);
284 buf = new char[plen-3]; 381 buf = new char[plen-3];
285 System.arraycopy(packet, 3, buf, 0, plen-3); 382 System.arraycopy(packet, 3, buf, 0, plen-3);
287 break; 384 break;
288 case MONITOR_CMD_KEYSTATE: 385 case MONITOR_CMD_KEYSTATE:
289 teKeyState(connection, (packet[1] == 1)); 386 teKeyState(connection, (packet[1] == 1));
290 break; 387 break;
291 case MONITOR_CMD_CURSORMOVE: 388 case MONITOR_CMD_CURSORMOVE:
292 teCursorMove(connection, packet[1], packet[2]); 389 teCursorMove(connection, packet[1], packet[2], packet[3]);
293 break; 390 break;
294 case MONITOR_CMD_SCREENCHANGE: 391 case MONITOR_CMD_SCREENCHANGE:
295 buf = new char[plen-3]; 392 buf = new char[plen-3];
296 System.arraycopy(packet, 3, buf, 0, plen-3); 393 System.arraycopy(packet, 3, buf, 0, plen-3);
394 cleanup(buf);
297 teScreenChange(connection, packet[1], packet[2], buf); 395 teScreenChange(connection, packet[1], packet[2], buf);
298 break; 396 break;
299 case MONITOR_CMD_FIELDVALUE: 397 case MONITOR_CMD_FIELDVALUE:
300 buf = new char[plen-3]; 398 value_queue.clear();
301 System.arraycopy(packet, 3, buf, 0, plen-3); 399 value_queue.put(reformatValue(packet));
302 Log.i(TAG, String.format("teFieldValue %d line %d column %d b.len %d", connection, packet[1], packet[2], buf.length));
303 queue.put(new triple(packet[1], packet[2], buf));
304 break; 400 break;
305 default: 401 default:
306 break; 402 break;
307 } 403 }
308 } catch (IOException e) { 404 } catch (IOException e) {
322 Log.e(TAG, "exception in CommunicationThread.run() closing sockets", e); 418 Log.e(TAG, "exception in CommunicationThread.run() closing sockets", e);
323 } 419 }
324 client_in = null; 420 client_in = null;
325 client_out = null; 421 client_out = null;
326 client_socket = null; 422 client_socket = null;
423 clients.remove(connection);
327 } 424 }
328 } 425 }
329 426
330 private void abandonGetField(int except) { 427 private void abandonGetField(int except) {
331 for (CommunicationThread cm : clients.values()) { 428 for (CommunicationThread cm : clients.values()) {
332 if (cm.connection != except) { 429 if (cm.connection != except) {
333 cm.queue.offer(new triple(0, 0, new char[0])); 430 cm.abandon();
334 } 431 }
335 } 432 }
336 } 433 }
337 434
338 435
339 //////////////////////////////////////// 436 ////////////////////////////////////////
340 //// these functions run on the reader thread here and call your monitoring code 437 //// these functions run on the reader thread here and call your monitoring code
341 438
342 public void teInit(int connection, char[] buf) { 439 public void teInit(int connection, String fn) {
343 String fn = new String(buf);
344 Log.i(TAG, String.format("teInit %d file %s", connection, fn)); 440 Log.i(TAG, String.format("teInit %d file %s", connection, fn));
345 printer(String.format("init %d %s", connection, fn)); 441 printer(String.format("init %d %s", connection, fn));
442 setCurrentConnection(connection);
443 }
444
445 public void teCloseAll() {
446 Log.i(TAG, String.format("teCloseAll"));
447 }
448
449 public void teClose(int connection) {
450 Log.i(TAG, String.format("teClose %d", connection));
451 setCurrentConnection(-1);
346 } 452 }
347 453
348 public void teActivate(int connection, int lines, int columns, char[] buf) { 454 public void teActivate(int connection, int lines, int columns, char[] buf) {
349 Log.i(TAG, String.format("teActivate %d", connection)); 455 Log.i(TAG, String.format("teActivate %d", connection));
350 printer(String.format("activate %d lines %d columns %d b.len %d", connection, lines, columns, buf.length)); 456 printer(String.format("activate %d lines %d columns %d b.len %d", connection, lines, columns, buf.length));
457 boolean sameinit = false;
458 CommunicationThread cm = clients.get(currentConnection);
459 if (cm != null) {
460 sameinit = (cm.initString == fn);
461 }
462 setCurrentConnection(connection);
351 } 463 }
352 464
353 public void teKeyState(int connection, boolean down) { 465 public void teKeyState(int connection, boolean down) {
354 String d = (down) ? "yes" : "no"; 466 String d = (down) ? "yes" : "no";
355 Log.i(TAG, String.format("teKeyState %d isdown %s", connection, d)); 467 Log.i(TAG, String.format("teKeyState %d isdown %s", connection, d));
356 printer(String.format("keystate %d isdown %s", connection, d)); 468 printer(String.format("keystate %d isdown %s", connection, d));
357 } 469 }
358 470
359 public void teCursorMove(int connection, int l, int c) { 471 public void teCursorMove(int connection, int l, int c, int why) {
360 Log.i(TAG, String.format("teCursorMove %d line %d column %d", connection, l, c)); 472 Log.i(TAG, String.format("teCursorMove %d line %d column %d why %d", connection, l, c, why));
361 } 473 }
362 474
363 public void teScreenChange(int connection, int lines, int columns, char[] buf) { 475 public void teScreenChange(int connection, int lines, int columns, char[] buf) {
364 Log.i(TAG, String.format("teScreenChange %d lines %d columns %d b.len %d", connection, lines, columns, buf.length)); 476 Log.i(TAG, String.format("teScreenChange %d lines %d columns %d b.len %d", connection, lines, columns, buf.length));
365 } 477 }
388 if (cm != null) { 500 if (cm != null) {
389 char[] arg = new char[5]; 501 char[] arg = new char[5];
390 arg[2] = (char) (l & 0x0000ffff); 502 arg[2] = (char) (l & 0x0000ffff);
391 arg[3] = (char) (c & 0x0000ffff); 503 arg[3] = (char) (c & 0x0000ffff);
392 arg[4] = (char) (len & 0x0000ffff); 504 arg[4] = (char) (len & 0x0000ffff);
393 cm.queue.clear(); // we never have more than one outstanding getfield request on the connection 505 int request = cm.cmGetField(arg);
394 cm.clientWrite(MONITOR_CMD_GETFIELD, arg);
395 try { 506 try {
396 triple t = cm.queue.take(); // wait for response 507 int tid = android.os.Process.myTid();
508 triple t;
509 if (tid == cm.thread_id) {
510 // we are running on the socket reader thread, called via teCursorMove() or teScreenChange()
511 // we need to peek command packets from the socket looking for our fieldvalue response
512 Log.i(TAG, String.format("java teGetField() peeking value for getfield on reader thread"));
513 t = cm.peekValue(request);
514 }
515 else {
516 // we are running on some other thread, just wait for the reader thread to
517 // process the fieldvalue and put it on the queue.
518 Log.i(TAG, String.format("java teGetField() waiting for data for getfield on reco thread"));
519 t = cm.value_queue.take(); // wait for response
520 }
397 Log.i(TAG, String.format("teGetField %d response line %d column %d len %d", connection, t.l, t.c, t.b.length)); 521 Log.i(TAG, String.format("teGetField %d response line %d column %d len %d", connection, t.l, t.c, t.b.length));
398 return t.b; 522 return t.b;
399 } catch (InterruptedException e) { 523 } catch (InterruptedException e) {
400 Log.e(TAG, "exception in teGetField(), return empty string", e); 524 Log.e(TAG, "exception in teGetField(), return empty string", e);
401 } 525 }
440 System.arraycopy(url, 0, arg2, base, len); 564 System.arraycopy(url, 0, arg2, base, len);
441 cm.clientWrite(MONITOR_CMD_SHOWURL, arg2); 565 cm.clientWrite(MONITOR_CMD_SHOWURL, arg2);
442 } 566 }
443 } 567 }
444 568
569 public static void teAbandonGetField(int connection) {
570 Log.i(TAG, String.format("teAbandonGetField %d", connection));
571 CommunicationThread cm = clients.get(connection);
572 if (cm != null) {
573 cm.abandon();
574 }
575 }
576
445 public static void teSwitchSession(int connection) { 577 public static void teSwitchSession(int connection) {
446 CommunicationThread cm = clients.get(connection); 578 CommunicationThread cm = clients.get(connection);
447 if (cm != null) { 579 if (cm != null) {
448 char [] arg2 = new char[2]; 580 char [] arg2 = new char[2];
449 cm.clientWrite(MONITOR_CMD_SWITCHSESSION, arg2); 581 cm.clientWrite(MONITOR_CMD_SWITCHSESSION, arg2);