libs.net: complete network functionality
[WeStealzYourDataz.git] / src / uk / ac / ntu / n0521366 / wsyd / libs / net / NetworkStream.java
index fe4c4bb..48da1b7 100644 (file)
@@ -28,8 +28,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.net.Socket;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -45,7 +45,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
     
     static final long longSize = Long.SIZE / 8;
     
-    int _key;
+    long _key;
     
     /**
      * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
@@ -55,6 +55,8 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
     
+    private final NetworkMessageEventListenerManager _eventManager;
+    
     private ObjectOutputStream _socketOS;
     
     private InputStream _socketIS;
@@ -67,9 +69,10 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
     
     private ByteArrayOutputStream _baos;
     
-    NetworkStream(Socket socket, NetworkSocketClosing owner) {
+    public NetworkStream(Socket socket, NetworkSocketClosing owner) {
         this._socket = socket;
         this._owner = owner;
+        _eventManager = new NetworkMessageEventListenerManager();
     }
     
     @Override
@@ -88,19 +91,22 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
         while (!this.isCancelled()) {
             
             // check for incoming message
-            if (_socketIS.available() >= expectedLength) {
+            if (_ois.available() >= expectedLength) {
                 if (expectingLong == true) {                  
                     expectedLength = _ois.readLong();
+                    System.err.println("Expecting Object with length " + expectedLength);
+                    System.err.println("Remaining bytes: " + _ois.available());
                     expectingLong = false;
                 }
-                else {                  
+                else {
+                    System.err.println("About to read at least " + _ois.available() + " bytes");
                     read();
                     expectedLength = longSize;
                     expectingLong = true;
                 }
             }
             // send a queued message
-            NetworkMessage temp = this.dequeueMessage();
+            NetworkMessage temp = this.sendMessage();
             if (temp != null) {
                 if (!this.write(temp)) {
                     System.err.println("Unable to send message over TCP channel");
@@ -128,6 +134,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     public boolean write(NetworkMessage message) {
         boolean result = false;
+        System.err.println("NetworkStream.write()");
         
         if (message != null) {
             try {             
@@ -135,11 +142,15 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
                 _oos.flush();
                 long messageSize = _baos.size();
                 _socketOS.writeLong(messageSize);
+                System.err.println(" bytes to write: " + messageSize);
                 _baos.writeTo(_socketOS);
+                _socketOS.flush();
+                System.err.println("baos.size()=" + _baos.size());
                 
                 result = true;
                 
             } catch (IOException ex) {
+                ex.printStackTrace();
                 // TODO: Implement graceful disconnect
             }
         }
@@ -154,9 +165,18 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     public boolean read() {
         boolean result = false;
+        System.err.println("NetworkStream.read()");
         
         try {
-            NetworkMessage message = (NetworkMessage)_ois.readObject();
+            NetworkMessage message = null;
+
+            try {
+                message = (NetworkMessage)_ois.readObject();
+            } catch (java.io.OptionalDataException ex) {
+                System.err.println("Length: " + ex.length + " EOF: " + ex.eof);
+                ex.printStackTrace();
+            }
+            message.setKey(_key);
             publish(message);
             
             result = true;
@@ -179,7 +199,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      *
      * @return a message to be sent
      */
-    protected NetworkMessage dequeueMessage() {
+    protected NetworkMessage sendMessage() {
         return this._sendMessageQueue.poll();
     }
     
@@ -208,5 +228,24 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
         }
         return result;
     }
+
+    public NetworkMessageEventListenerManager getEventManager() {
+        return this._eventManager;
+    }
+    
+    /**
+     * Fetch messages received over the stream.
+     * 
+     * For delivery to event listeners; usually Swing GUI components. This method will run on the
+     * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
+     * 
+     * @param list messages received and queued
+     */
+    @Override
+    protected void process(List<NetworkMessage> list) {
+        for (NetworkMessage message: list) {
+            this._eventManager.fireNetworkMessageEvent(message);
+        }
+    }
     
 }