Implemented TCP streams for object serialization
authorEddie <dev@fun2be.me>
Mon, 1 Jun 2015 16:56:56 +0000 (17:56 +0100)
committerEddie <dev@fun2be.me>
Mon, 1 Jun 2015 16:56:56 +0000 (17:56 +0100)
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerTCP.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/WSYD_SocketAddress.java

index 3a186ba..b8eaae5 100644 (file)
@@ -358,7 +358,7 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
      * Fetch messages received by the server.
      * 
      * For delivery to event listeners; usually Swing GUI components. This method will run on the
-     * Owner Thread so must complete quickly it that is the GUI Event Dispatch Thread.
+     * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
      * 
      * @param list messages received and queued
      */
index 6c48624..710e443 100644 (file)
  */
 package uk.ac.ntu.n0521366.wsyd.libs.net;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Dual-use multithreading network TCP server that can be used stand-alone
@@ -34,8 +41,42 @@ import java.net.SocketException;
  * @author TJ <hacker@iam.tj>
  */
 public class NetworkServerTCP extends NetworkServerAbstract {
+    
     ServerSocket _serverSocket;
+    
+    NetworkStreamManager _streamManager;
+
+     /**
+     * Construct the server with a Logger.
+     * 
+     * No socket is opened.
+     * 
+     * @param socketAddress The socket to listen on
+     * @param title source identifier for use in log messages and sent NetworkMessage objects
+     * @param serviceToHostMap the map object used for host <> InetSocketAddress lookups
+     * @param manager
+     * @param logger An instance of Logger to be used by all objects of this class
+     */
+    public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager, Logger logger) {
+        super(socketAddress, title, serviceToHostMap, logger);
+        this._streamManager = manager;
+    }
 
+    /**
+     * Construct the server without a Logger.
+     * 
+     * No socket is opened.
+     * 
+     * @param socketAddress The socket to listen on
+     * @param title source identifier for use in log messages and sent NetworkMessage objects
+     * @param serviceToHostMap the map object used for host <> InetSocketAddress lookups
+     * @param manager
+     */
+    public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager) {
+        super(socketAddress, title, serviceToHostMap);
+        this._streamManager = manager;
+    }
+    
     /**
      * Open the socket ready for accepting connections.
      * 
@@ -45,7 +86,21 @@ public class NetworkServerTCP extends NetworkServerAbstract {
      */
     @Override
     public  void serverOpen() throws SocketException {
+        try {
+            _serverSocket = new ServerSocket(_socketAddress.getPort(), 50, _socketAddress.getAddress());
+        } catch (IOException ex) {
+            throw new SocketException(ex.getMessage());
+        }
+        _serverSocket.setSoTimeout(100);
         
+        if (_socketAddress.getPort() == Network.PORTS_EPHEMERAL) {
+            // reflect the actual port in use if an ephermal port was requested
+            InetSocketAddress actualSA = (InetSocketAddress)_serverSocket.getLocalSocketAddress();
+            _socketAddress.setAddress(actualSA.getAddress());
+            _socketAddress.setPort(actualSA.getPort());
+        }
+        //log(Level.FINEST, _title, MessageFormat.format("Connection from {0}:{1}", _socketAddress.getAddress().getCanonicalHostName(), Integer.toString(_socketAddress.getPort())));
+        // TODO: Complete this implementation
     }
     
     /**
@@ -71,7 +126,12 @@ public class NetworkServerTCP extends NetworkServerAbstract {
      */
     @Override
     public void serverClose() throws SocketException {
-        
+        if (this._serverSocket != null)
+            try {
+                this._serverSocket.close();
+        } catch (IOException ex) {
+            throw new SocketException(ex.getMessage());
+        }
     }
     
     /**
@@ -85,6 +145,21 @@ public class NetworkServerTCP extends NetworkServerAbstract {
     public boolean serverListen() {
         boolean result = false;
         
+        try {
+            //System.err.println("Before");
+            Socket connectionSocket = _serverSocket.accept();
+            NetworkStream newStream = new NetworkStream(connectionSocket, _streamManager);
+            _streamManager.addStream(0, newStream);
+            log(Level.INFO, _title, MessageFormat.format("Incoming connection from {0}:{1}", connectionSocket.getInetAddress().getCanonicalHostName(), Integer.toString(connectionSocket.getPort())));
+        }
+        catch (SocketTimeoutException e) {
+            //Nothing to be done
+        }
+        catch (IOException e) {
+            // Nothing to be done
+            System.err.println("Incoming connection caused exception...");
+        }
+       
         return false;
     }
 
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java
new file mode 100644 (file)
index 0000000..641890f
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+/**
+ *
+ * @author eddie
+ */
+public interface NetworkSocketClosing {
+    /**
+     * Asking parent for permission to close.
+     * 
+     * Parent will remove stream from its list and stream shall proceed to close.
+     * 
+     * @param key
+     * @return 
+     */
+    public boolean canCloseSocket(int key);
+}
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java
new file mode 100644 (file)
index 0000000..fe4c4bb
--- /dev/null
@@ -0,0 +1,212 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.swing.SwingWorker;
+
+/**
+ *
+ * @author eddie
+ */
+public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
+
+    private final Socket _socket;
+    
+    static final long longSize = Long.SIZE / 8;
+    
+    int _key;
+    
+    /**
+     * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
+     * 
+     * Allows the Owner Thread to submit new messages for sending that the Worker Thread
+     * can safely access.
+     */
+    protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
+    
+    private ObjectOutputStream _socketOS;
+    
+    private InputStream _socketIS;
+    
+    private final NetworkSocketClosing _owner;
+    
+    private ObjectInputStream _ois;
+    
+    private ObjectOutputStream _oos;
+    
+    private ByteArrayOutputStream _baos;
+    
+    NetworkStream(Socket socket, NetworkSocketClosing owner) {
+        this._socket = socket;
+        this._owner = owner;
+    }
+    
+    @Override
+    public Boolean doInBackground() throws Exception {
+        
+        _baos = new ByteArrayOutputStream();
+        _oos = new ObjectOutputStream(_baos);
+        _socketOS = new ObjectOutputStream(_socket.getOutputStream());
+        
+        _socketIS = _socket.getInputStream();
+        _ois = new ObjectInputStream(_socketIS);
+        
+        long expectedLength = longSize;
+        boolean expectingLong = true;
+        
+        while (!this.isCancelled()) {
+            
+            // check for incoming message
+            if (_socketIS.available() >= expectedLength) {
+                if (expectingLong == true) {                  
+                    expectedLength = _ois.readLong();
+                    expectingLong = false;
+                }
+                else {                  
+                    read();
+                    expectedLength = longSize;
+                    expectingLong = true;
+                }
+            }
+            // send a queued message
+            NetworkMessage temp = this.dequeueMessage();
+            if (temp != null) {
+                if (!this.write(temp)) {
+                    System.err.println("Unable to send message over TCP channel");
+                    // FIXME: Potentially add logger support
+                }
+            }
+            
+        }
+        return false;
+    }
+    
+    @Override
+    protected void done() {
+        _owner.canCloseSocket(_key);
+    }
+    
+    /**
+     * Send a NetworkMessage over a NetworkStream.
+     * 
+     * Each message is preceeded by its length as a long to avoid rare, but possible blocking
+     * in the reader.
+     * 
+     * @param message
+     * @return true if message successfully sent
+     */
+    public boolean write(NetworkMessage message) {
+        boolean result = false;
+        
+        if (message != null) {
+            try {             
+                _oos.writeObject(message);
+                _oos.flush();
+                long messageSize = _baos.size();
+                _socketOS.writeLong(messageSize);
+                _baos.writeTo(_socketOS);
+                
+                result = true;
+                
+            } catch (IOException ex) {
+                // TODO: Implement graceful disconnect
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * Reads NetworkMessage from the incoming stream and publishes it.
+     * 
+     * 
+     * @return true if message successfully published
+     */
+    public boolean read() {
+        boolean result = false;
+        
+        try {
+            NetworkMessage message = (NetworkMessage)_ois.readObject();
+            publish(message);
+            
+            result = true;
+        } catch (IOException ex) {
+            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+            // FIXME: Replace logger
+            // TODO: Graceful disconnect
+        } catch (ClassNotFoundException ex) {
+            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+            // FIXME: Replace logger
+        }
+        // TODO: Implement NetworkStream::read()
+        return result;
+    }
+    
+    /**
+     * Removes a message from the queue of pending messages.
+     *
+     * This method is called on the Worker Thread by the doInBackground() main loop.
+     *
+     * @return a message to be sent
+     */
+    protected NetworkMessage dequeueMessage() {
+        return this._sendMessageQueue.poll();
+    }
+    
+    /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
+    
+    /**
+     * Adds a message to the queue of pending messages.
+     * 
+     * This method will usually be called from the Owner Thread.
+     * 
+     * @param message to be sent
+     * @return true if the message was added to the queue
+     */
+    public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
+        boolean result = false;
+        
+        if (message != null) {
+            NetworkMessage temp;
+            try { // make a deep clone of the message
+                temp = NetworkMessage.clone(message);
+                result = this._sendMessageQueue.add(temp);
+            } catch (CloneNotSupportedException e) {
+                // TODO: queueMessage() log CloneNotSupportedException
+                e.printStackTrace();
+            }
+        }
+        return result;
+    }
+    
+}
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java
new file mode 100644 (file)
index 0000000..dfa02ba
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @author eddie
+ */
+public class NetworkStreamManager implements NetworkSocketClosing {
+    
+    static final int SERVERSOCIAL = -98;
+    
+    static final int SERVERCHAT = -99;
+    
+    static final int TEMPCLIENT = -200;
+    
+    Random random;
+    
+    /**
+     *
+     */
+    public ConcurrentHashMap<Integer, NetworkStream> _tcpStreams;
+    
+    public NetworkStreamManager() {
+        random = new Random();
+        _tcpStreams = new ConcurrentHashMap<Integer, NetworkStream>();
+    }
+    
+    public int addStream(int userID, NetworkStream netStream) {
+        int result = 0;
+        
+        if (netStream != null) {
+            if (!_tcpStreams.containsKey(userID)) {
+                if (userID == 0) {
+                    int tempKey;
+                    do {
+                        tempKey = createTempKey();
+                    } while (_tcpStreams.containsKey(tempKey));
+                    _tcpStreams.put(tempKey, netStream);
+                    result = tempKey;
+                    System.err.println("Added new stream with key: " + tempKey);
+                }
+                else {
+                    if ((userID < 0 && userID > TEMPCLIENT) || userID > 0) {
+                        _tcpStreams.put(userID, netStream);
+                        result = userID;
+                    }
+                }
+            }
+        }
+        
+        return result;
+    }
+    
+    public boolean updateKey(int oldID, int newID) {
+        boolean result = false;
+        
+        if (!_tcpStreams.containsKey(newID)) {
+            if (_tcpStreams.containsKey(oldID)) {
+                _tcpStreams.put(newID, _tcpStreams.get(oldID));
+                _tcpStreams.remove(oldID);
+                
+                result = true;
+            }
+        }
+        
+        return result;
+    }
+    
+    private int createTempKey() {
+        return TEMPCLIENT + -random.nextInt(1000000);
+    }
+
+    @Override
+    public boolean canCloseSocket(int key) {
+        boolean result = false;
+        
+        if (_tcpStreams.containsKey(key)) {
+            _tcpStreams.remove(key);
+            result = true;
+            System.err.println("Removed key: " + key);
+        }
+        return result;
+    }
+    
+}
index 70adf4b..e66d60a 100644 (file)
@@ -63,7 +63,7 @@ public class WSYD_SocketAddress implements java.io.Serializable {
             throw new IllegalArgumentException(MessageFormat.format("Not a correct dotted decimal notation: {0}", ipv4));
         
         int i = 0;
-        for (int index = 0; index <=3; index++) {
+        for (int index = 0; index <= 3; index++) {
             try {
                 i = Integer.parseInt(elements[index]);
             } catch (NumberFormatException e) {