Implemented TCP streams for object serialization
[WeStealzYourDataz.git] / src / uk / ac / ntu / n0521366 / wsyd / libs / net / NetworkStream.java
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java
new file mode 100644 (file)
index 0000000..fe4c4bb
--- /dev/null
@@ -0,0 +1,212 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.swing.SwingWorker;
+
+/**
+ *
+ * @author eddie
+ */
+public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
+
+    private final Socket _socket;
+    
+    static final long longSize = Long.SIZE / 8;
+    
+    int _key;
+    
+    /**
+     * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
+     * 
+     * Allows the Owner Thread to submit new messages for sending that the Worker Thread
+     * can safely access.
+     */
+    protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
+    
+    private ObjectOutputStream _socketOS;
+    
+    private InputStream _socketIS;
+    
+    private final NetworkSocketClosing _owner;
+    
+    private ObjectInputStream _ois;
+    
+    private ObjectOutputStream _oos;
+    
+    private ByteArrayOutputStream _baos;
+    
+    NetworkStream(Socket socket, NetworkSocketClosing owner) {
+        this._socket = socket;
+        this._owner = owner;
+    }
+    
+    @Override
+    public Boolean doInBackground() throws Exception {
+        
+        _baos = new ByteArrayOutputStream();
+        _oos = new ObjectOutputStream(_baos);
+        _socketOS = new ObjectOutputStream(_socket.getOutputStream());
+        
+        _socketIS = _socket.getInputStream();
+        _ois = new ObjectInputStream(_socketIS);
+        
+        long expectedLength = longSize;
+        boolean expectingLong = true;
+        
+        while (!this.isCancelled()) {
+            
+            // check for incoming message
+            if (_socketIS.available() >= expectedLength) {
+                if (expectingLong == true) {                  
+                    expectedLength = _ois.readLong();
+                    expectingLong = false;
+                }
+                else {                  
+                    read();
+                    expectedLength = longSize;
+                    expectingLong = true;
+                }
+            }
+            // send a queued message
+            NetworkMessage temp = this.dequeueMessage();
+            if (temp != null) {
+                if (!this.write(temp)) {
+                    System.err.println("Unable to send message over TCP channel");
+                    // FIXME: Potentially add logger support
+                }
+            }
+            
+        }
+        return false;
+    }
+    
+    @Override
+    protected void done() {
+        _owner.canCloseSocket(_key);
+    }
+    
+    /**
+     * Send a NetworkMessage over a NetworkStream.
+     * 
+     * Each message is preceeded by its length as a long to avoid rare, but possible blocking
+     * in the reader.
+     * 
+     * @param message
+     * @return true if message successfully sent
+     */
+    public boolean write(NetworkMessage message) {
+        boolean result = false;
+        
+        if (message != null) {
+            try {             
+                _oos.writeObject(message);
+                _oos.flush();
+                long messageSize = _baos.size();
+                _socketOS.writeLong(messageSize);
+                _baos.writeTo(_socketOS);
+                
+                result = true;
+                
+            } catch (IOException ex) {
+                // TODO: Implement graceful disconnect
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * Reads NetworkMessage from the incoming stream and publishes it.
+     * 
+     * 
+     * @return true if message successfully published
+     */
+    public boolean read() {
+        boolean result = false;
+        
+        try {
+            NetworkMessage message = (NetworkMessage)_ois.readObject();
+            publish(message);
+            
+            result = true;
+        } catch (IOException ex) {
+            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+            // FIXME: Replace logger
+            // TODO: Graceful disconnect
+        } catch (ClassNotFoundException ex) {
+            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+            // FIXME: Replace logger
+        }
+        // TODO: Implement NetworkStream::read()
+        return result;
+    }
+    
+    /**
+     * Removes a message from the queue of pending messages.
+     *
+     * This method is called on the Worker Thread by the doInBackground() main loop.
+     *
+     * @return a message to be sent
+     */
+    protected NetworkMessage dequeueMessage() {
+        return this._sendMessageQueue.poll();
+    }
+    
+    /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
+    
+    /**
+     * Adds a message to the queue of pending messages.
+     * 
+     * This method will usually be called from the Owner Thread.
+     * 
+     * @param message to be sent
+     * @return true if the message was added to the queue
+     */
+    public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
+        boolean result = false;
+        
+        if (message != null) {
+            NetworkMessage temp;
+            try { // make a deep clone of the message
+                temp = NetworkMessage.clone(message);
+                result = this._sendMessageQueue.add(temp);
+            } catch (CloneNotSupportedException e) {
+                // TODO: queueMessage() log CloneNotSupportedException
+                e.printStackTrace();
+            }
+        }
+        return result;
+    }
+    
+}