changeset 23:2586a4f5c8c3

add mechanism to allow getfield from other threads
author Carl Byington <carl@five-ten-sg.com>
date Fri, 01 May 2015 11:52:31 -0700
parents adc776858a2d
children 4f1cc4f44c41
files AndroidManifest.xml src/com/five_ten_sg/connectbot/monitor/MonitorService.java xml/510connectbotmonitor.in
diffstat 3 files changed, 157 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/AndroidManifest.xml	Tue Jul 08 09:29:08 2014 -0700
+++ b/AndroidManifest.xml	Fri May 01 11:52:31 2015 -0700
@@ -2,7 +2,7 @@
 <manifest xmlns:android="http://schemas.android.com/apk/res/android"
     package="com.five_ten_sg.connectbot.monitor"
     android:versionCode="1"
-    android:versionName="1.0.3-1" >
+    android:versionName="1.0.4-0" >
 
     <uses-sdk
         android:minSdkVersion="8"
--- a/src/com/five_ten_sg/connectbot/monitor/MonitorService.java	Tue Jul 08 09:29:08 2014 -0700
+++ b/src/com/five_ten_sg/connectbot/monitor/MonitorService.java	Fri May 01 11:52:31 2015 -0700
@@ -46,6 +46,10 @@
     public  static final char MONITOR_CMD_SWITCHSESSION = 10;
     public  static final char MONITOR_CMD_CURSORREQUEST = 11;
 
+    public  static final char CURSOR_REQUESTED      = 0;
+    public  static final char CURSOR_SCREEN_CHANGE  = 1;
+    public  static final char CURSOR_USER_KEY       = 2;
+
     public static final int       MONITORPORT = 6000;
     public static ConcurrentHashMap<Integer,CommunicationThread> clients = new ConcurrentHashMap<Integer,CommunicationThread>();
     public static int             currentConnection = -1;
@@ -100,6 +104,16 @@
         if (handler != null) handler.sendMessage(handler.obtainMessage(MonitorActivity.MESSAGE_CODE_PRINT, msg));
     }
 
+    public synchronized void setCurrentConnection(int connection) {
+        currentConnection = connection;
+    }
+
+    public synchronized int nextConnection() {
+        int c = 1;
+        while (clients.get(c) != null) c++;
+        return c;
+    }
+
     @Override
     public int onStartCommand(Intent intent, int flags, int startId) {
         Log.i(TAG, "service onStartCommand()");
@@ -110,6 +124,7 @@
     public void onDestroy() {
         try {
             Log.i(TAG, "service onDestroy()");
+            teCloseAll();
             talker.stop();
             talker.shutdown();
             wifiLock.release();
@@ -134,7 +149,7 @@
             while (true) {
                 try{
                    socket     = serverSocket.accept();
-                   connection = connection + 1;
+                   connection = nextConnection();
                    CommunicationThread commThread = new CommunicationThread(connection, socket);
                    clients.put(connection, commThread);
                    commThread.start();
@@ -157,12 +172,16 @@
     }
 
     class CommunicationThread extends Thread {
+        public  int            thread_id;
         public  int            connection;
+        private String         initString = null;
         private Socket         client_socket;
         private InputStream    client_in;
         private OutputStream   client_out;
         private boolean        is_closing = false;
-        private BlockingQueue<triple>  queue = new ArrayBlockingQueue<triple>(1);
+        private Integer        getfields_outstanding = 0;
+        private BlockingQueue<triple>  value_queue = new ArrayBlockingQueue<triple>(1);
+        private BlockingQueue<char[]>  packet_queue = new ArrayBlockingQueue<char[]>(10000);
 
         public CommunicationThread(int handle, Socket socket) {
             connection    = handle;
@@ -175,6 +194,10 @@
             }
         }
 
+        public synchronized void abandon() {
+            value_queue.offer(new triple(0, 0, new char[0]));
+        }
+
         public void speak(byte [] msg, boolean flush, boolean synchronous) {
             if (speech) {
                 String smsg = bytesToString(msg);
@@ -226,6 +249,24 @@
             return b;
         }
 
+        void cleanup(char[] buf) {
+            int i;
+            for (i=0; i<buf.length; i++) {
+                if ((int)(buf[i]) < 32) buf[i] = ' ';
+            }
+        }
+
+        public int cmGetField(char[] c) {
+            int request;
+            synchronized(getfields_outstanding) {
+                request = getfields_outstanding;
+                getfields_outstanding = getfields_outstanding + 1;
+                value_queue.clear();  // we never have more than one outstanding getfield request from the reco thread on the connection
+                clientWrite(MONITOR_CMD_GETFIELD, c);
+            }
+            return request;
+        }
+
         public synchronized void clientWrite(char cmd, char[] c) {
             try {
                 if (client_out != null) {
@@ -233,6 +274,7 @@
                     c[1] = cmd;
                     Log.i(TAG, String.format("sending %d command", (int)cmd));
                     client_out.write(charsToBytes(c));
+                    client_out.flush();
                 }
             }
             catch (IOException e) {
@@ -262,12 +304,66 @@
             return bytesToChars(b, len2);
         }
 
+        public char[] readPacket() throws IOException {
+            char[] len = forceRead(1);
+            return forceRead(len[0]);
+        }
+
+        public char[] nextPacket() throws IOException, InterruptedException {
+            char[] packet = packet_queue.poll();
+            if (packet == null) {
+                packet = readPacket();
+                if (packet[0] == MONITOR_CMD_FIELDVALUE) {
+                    synchronized(getfields_outstanding) {
+                        getfields_outstanding = getfields_outstanding - 1;
+                    }
+                }
+            }
+            return packet;
+        }
+
+        private triple reformatValue(char[] packet) {
+            int plen = packet.length;
+            char[] buf = new char[plen-3];
+            System.arraycopy(packet, 3, buf, 0, plen-3);
+            cleanup(buf);
+            Log.i(TAG, String.format("teFieldValue %d line %d column %d b.len %d", connection, (int)packet[1], (int)packet[2], buf.length));
+            return new triple(packet[1], packet[2], buf);
+        }
+
+        public triple peekValue(int request) {
+            try {
+                while (true) {
+                    char[] packet = readPacket();
+                    if (packet[0] == MONITOR_CMD_FIELDVALUE) {
+                        synchronized(getfields_outstanding) {
+                            getfields_outstanding = getfields_outstanding - 1;
+                            if (request == 0) {
+                                return reformatValue(packet);
+                            }
+                            else {
+                                packet_queue.put(packet);
+                                request = request - 1;
+                            }
+                        }
+                    }
+                    else {
+                        packet_queue.put(packet);
+                    }
+                }
+            } catch (IOException e) {
+                return new triple(0, 0, new char[0]);
+            } catch (InterruptedException e) {
+                return new triple(0, 0, new char[0]);
+            }
+        }
+
         public void run() {
+            thread_id = android.os.Process.myTid();
             Log.i(TAG, String.format("CommunicationThread.run() client %d connected", connection));
             while (true) {
                 try {
-                    char[] len    = forceRead(1);
-                    char[] packet = forceRead(len[0]);
+                    char[] packet = nextPacket();
                     char[] buf;
                     char cmd = packet[0];
                     int plen = packet.length;
@@ -277,7 +373,8 @@
                             buf = new char[plen-1];
                             System.arraycopy(packet, 1, buf, 0, plen-1);
                             abandonGetField(connection);
-                            teInit(connection, buf);
+                            initString = new String(buf);
+                            teInit(connection, initString);
                             break;
                         case MONITOR_CMD_ACTIVATE:
                             abandonGetField(connection);
@@ -289,18 +386,17 @@
                             teKeyState(connection, (packet[1] == 1));
                             break;
                         case MONITOR_CMD_CURSORMOVE:
-                            teCursorMove(connection, packet[1], packet[2]);
+                            teCursorMove(connection, packet[1], packet[2], packet[3]);
                             break;
                         case MONITOR_CMD_SCREENCHANGE:
                             buf = new char[plen-3];
                             System.arraycopy(packet, 3, buf, 0, plen-3);
+                            cleanup(buf);
                             teScreenChange(connection, packet[1], packet[2], buf);
                             break;
                         case MONITOR_CMD_FIELDVALUE:
-                            buf = new char[plen-3];
-                            System.arraycopy(packet, 3, buf, 0, plen-3);
-                            Log.i(TAG, String.format("teFieldValue %d line %d column %d b.len %d", connection, packet[1], packet[2], buf.length));
-                            queue.put(new triple(packet[1], packet[2], buf));
+                            value_queue.clear();
+                            value_queue.put(reformatValue(packet));
                             break;
                         default:
                             break;
@@ -324,13 +420,14 @@
             client_in     = null;
             client_out    = null;
             client_socket = null;
+            clients.remove(connection);
         }
     }
 
     private void abandonGetField(int except) {
         for (CommunicationThread cm : clients.values()) {
             if (cm.connection != except) {
-                cm.queue.offer(new triple(0, 0, new char[0]));
+                cm.abandon();
             }
         }
     }
@@ -339,15 +436,30 @@
     ////////////////////////////////////////
     //// these functions run on the reader thread here and call your monitoring code
 
-    public void teInit(int connection, char[] buf) {
-        String fn = new String(buf);
+    public void teInit(int connection, String fn) {
         Log.i(TAG, String.format("teInit %d file %s", connection, fn));
         printer(String.format("init %d %s", connection, fn));
+        setCurrentConnection(connection);
+    }
+
+    public void teCloseAll() {
+        Log.i(TAG, String.format("teCloseAll"));
+    }
+
+    public void teClose(int connection) {
+        Log.i(TAG, String.format("teClose %d", connection));
+        setCurrentConnection(-1);
     }
 
     public void teActivate(int connection, int lines, int columns, char[] buf) {
         Log.i(TAG, String.format("teActivate %d", connection));
         printer(String.format("activate %d lines %d columns %d b.len %d", connection, lines, columns, buf.length));
+        boolean sameinit = false;
+        CommunicationThread cm = clients.get(currentConnection);
+        if (cm != null) {
+            sameinit = (cm.initString == fn);
+        }
+        setCurrentConnection(connection);
     }
 
     public void teKeyState(int connection, boolean down) {
@@ -356,8 +468,8 @@
         printer(String.format("keystate %d isdown %s", connection, d));
     }
 
-    public void teCursorMove(int connection, int l, int c) {
-        Log.i(TAG, String.format("teCursorMove %d line %d column %d", connection, l, c));
+    public void teCursorMove(int connection, int l, int c, int why) {
+        Log.i(TAG, String.format("teCursorMove %d line %d column %d why %d", connection, l, c, why));
     }
 
     public void teScreenChange(int connection, int lines, int columns, char[] buf) {
@@ -390,10 +502,22 @@
             arg[2] = (char) (l & 0x0000ffff);
             arg[3] = (char) (c & 0x0000ffff);
             arg[4] = (char) (len & 0x0000ffff);
-            cm.queue.clear();  // we never have more than one outstanding getfield request on the connection
-            cm.clientWrite(MONITOR_CMD_GETFIELD, arg);
+            int request = cm.cmGetField(arg);
             try {
-                triple t = cm.queue.take(); // wait for response
+                int tid = android.os.Process.myTid();
+                triple t;
+                if (tid == cm.thread_id) {
+                    // we are running on the socket reader thread, called via teCursorMove() or teScreenChange()
+                    // we need to peek command packets from the socket looking for our fieldvalue response
+                    Log.i(TAG, String.format("java teGetField() peeking value for getfield on reader thread"));
+                    t = cm.peekValue(request);
+                }
+                else {
+                    // we are running on some other thread, just wait for the reader thread to
+                    // process the fieldvalue and put it on the queue.
+                    Log.i(TAG, String.format("java teGetField() waiting for data for getfield on reco thread"));
+                    t = cm.value_queue.take(); // wait for response
+                }
                 Log.i(TAG, String.format("teGetField %d response line %d column %d len %d", connection, t.l, t.c, t.b.length));
                 return t.b;
             } catch (InterruptedException e) {
@@ -442,6 +566,14 @@
         }
     }
 
+    public static void teAbandonGetField(int connection) {
+        Log.i(TAG, String.format("teAbandonGetField %d", connection));
+        CommunicationThread cm = clients.get(connection);
+        if (cm != null) {
+            cm.abandon();
+        }
+    }
+
     public static void teSwitchSession(int connection) {
         CommunicationThread cm = clients.get(connection);
         if (cm != null) {
--- a/xml/510connectbotmonitor.in	Tue Jul 08 09:29:08 2014 -0700
+++ b/xml/510connectbotmonitor.in	Fri May 01 11:52:31 2015 -0700
@@ -116,8 +116,12 @@
 
             <para>
                 CURSORMOVE = 3 (TE -> Monitor).
-                The first argument is the line number (0..23)
-                and the second argument is the column number (0..79).
+                The first argument is the line number (0..23),
+                the second argument is the column number (0..79), and the third
+                argument is the reason for sending this cursor update. REASON=0 is
+                from a previous CURSORREQUEST command. REASON=1 is a cursor update
+                related to the previous SCREENCHANGE buffer update. REASON=2 is
+                a cursor update caused by user keystrokes.
             </para>
 
             <para>