libs.net: complete network functionality
authorEddie <dev@fun2be.me>
Tue, 2 Jun 2015 15:59:43 +0000 (16:59 +0100)
committerEddie <dev@fun2be.me>
Tue, 2 Jun 2015 15:59:43 +0000 (16:59 +0100)
12 files changed:
src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessage.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventGenerator.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java [new file with mode: 0644]
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerTCP.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDP.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDPMulticast.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java
src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java

diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java
new file mode 100644 (file)
index 0000000..9035a2c
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.util.EventObject;
+
+/**
+ *
+ * @author eddie
+ */
+public class ConnectionEstablishedEvent extends EventObject {
+    
+    private final NetworkStream _stream;
+
+    public ConnectionEstablishedEvent(Object source, NetworkStream stream) {
+        super(source);
+        _stream = stream;
+    }
+    
+    public NetworkStream getStream() {
+        return _stream;
+    }
+    
+}
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java
new file mode 100644 (file)
index 0000000..f099f2f
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+/**
+ *
+ * @author eddie
+ */
+public interface ConnectionEstablishedEventListener {
+    public void connectionEstablished(ConnectionEstablishedEvent event);
+}
index a3e9dd3..a513e72 100644 (file)
@@ -58,6 +58,11 @@ public class NetworkMessage implements Serializable, Cloneable {
      * Filled in by the message originator with the title of the destination service
      */
     String _serviceTarget;
+    
+    /**
+     * Optional user key of a NetworkStream to enable TCP to send userID on connection
+     */
+    long _key;
 
     Class<?> _class;
     MessageAbstract _message;
@@ -83,7 +88,7 @@ public class NetworkMessage implements Serializable, Cloneable {
      * contain native types that are serializable
      * @throws IllegalArgumentException 
      */
-    NetworkMessage(String intent, String target, MessageAbstract message) throws IllegalArgumentException {
+    public NetworkMessage(String intent, String target, MessageAbstract message) throws IllegalArgumentException {
         _serializeLength = -1;
         if ( !(intent != null && intent.length() > 0) ) 
             throw(new IllegalArgumentException("intent cannot be null or empty"));
@@ -166,6 +171,24 @@ public class NetworkMessage implements Serializable, Cloneable {
         return _serializeLength;
     }
     
+    /**
+     * Set the key of the TCP stream in the messsage.
+     * 
+     * @param key 
+     */
+    public void setKey(long key) {
+        _key = key;
+    }
+    
+    /**
+     * Get the key set by the ServerTCP.
+     * 
+     * @return the key of the stream associated
+     */
+    public long getKey() {
+        return _key;
+    }
+    
     /**
      * Create a message for passing over the network.
      * 
index 400230e..66ed1c2 100644 (file)
@@ -29,6 +29,7 @@ package uk.ac.ntu.n0521366.wsyd.libs.net;
  * @see NetworkMessage
  * @see NetworkMessageEvent
  * @see NetworkMessageEventListener
+ * @see NetworkMessageEventListenerManager
  * @author TJ <hacker@iam.tj>
  */
 public interface NetworkMessageEventGenerator {
@@ -53,5 +54,12 @@ public interface NetworkMessageEventGenerator {
      * 
      * @param listener 
      */
-    public void removeNetworkMessageEventListener(NetworkMessageEventListener listener);    
+    public void removeNetworkMessageEventListener(NetworkMessageEventListener listener);
+    
+    /**
+     * Sends message to all registered listeners.
+     * 
+     * @param message 
+     */
+    public void fireNetworkMessageEvent(NetworkMessage message);
 }
diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java
new file mode 100644 (file)
index 0000000..7c18a2c
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.util.ArrayList;
+
+/**
+ *
+ * @author eddie
+ */
+public class NetworkMessageEventListenerManager implements NetworkMessageEventGenerator {
+    /**
+     * Wrapper allowing filtering of NetworkMessageEvents based on message intent.
+     */
+    public class NetworkMessageEventListenerWithIntent {
+        String _intent;
+        NetworkMessageEventListener _listener;
+        
+        public NetworkMessageEventListenerWithIntent(NetworkMessageEventListener listener, String intent) {
+            _intent = intent;
+            _listener = listener;
+        }
+    }
+    protected ArrayList<NetworkMessageEventListenerWithIntent> _NetworkMessageEventListeners;
+    
+    /**
+     * Construct an empty list of Event Listeners.
+     */
+    NetworkMessageEventListenerManager() {
+        _NetworkMessageEventListeners = new ArrayList<>();
+    }
+    
+    /**
+     * Add a NetworkMessageEvent listener.
+     * 
+     * Listens to all intents.
+     * 
+     * @param listener 
+     */
+    @Override
+    public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener) {
+        _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, null));
+    }
+
+    /**
+     * Add a filtered NetworkMessageEvent listener.
+     * 
+     * Filters on the intent of the NetworkMessage.
+     * 
+     * @param listener
+     * @param intent null to listen to all intents, otherwise the intent to listen for
+     */
+    @Override
+    public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener, String intent) {
+        _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, intent));        
+    }
+
+    /**
+     * Remove a NetworkMessageEvent listener.
+     * 
+     * @param listener 
+     */
+    @Override
+    public synchronized void removeNetworkMessageEventListener(NetworkMessageEventListener listener) {
+        for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners)
+            if (intentListener._listener == listener)
+                _NetworkMessageEventListeners.remove(intentListener);
+    }
+    
+    /**
+     * Send a NetworkMessageEvent to all listeners.
+     * 
+     * Only sends the message to listeners registered for the same intent, or for all messages.
+     * 
+     * @param message the NetworkMessage to send
+     */
+    @Override
+    public synchronized void fireNetworkMessageEvent(NetworkMessage message) {
+        NetworkMessageEvent event = new NetworkMessageEvent(this, message);
+        for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners) {
+            if (intentListener._intent.equals(message._intent) || intentListener._intent == null)
+                intentListener._listener.NetworkMessageReceived(event);
+        }
+    }
+}
index b8eaae5..3a866df 100644 (file)
@@ -65,7 +65,7 @@ import javax.swing.SwingWorker;
  * 
  * @author TJ <hacker@iam.tj>
  */
-public abstract class NetworkServerAbstract extends SwingWorker<Integer, NetworkMessage> implements NetworkMessageEventGenerator {
+public abstract class NetworkServerAbstract extends SwingWorker<Integer, NetworkMessage> {
 
     /**
      * Single Logger for the class used by all object instances.
@@ -100,28 +100,8 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
     WSYD_SocketAddress _socketAddress;
 
     protected ServiceAddressMap _serviceToHostMap;
-    /**
-     * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
-     * 
-     * Allows the Owner Thread to submit new messages for sending that the Worker Thread
-     * can safely access.
-     */
-    protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
-
-    /**
-     * Wrapper for filtering NetworkMessageEvents based on the message intent
-     */
-    public class NetworkMessageEventListenerWithIntent {
-        String _intent;
-        NetworkMessageEventListener _listener;
-        
-        public NetworkMessageEventListenerWithIntent(NetworkMessageEventListener listener, String intent) {
-            _intent = intent;
-            _listener = listener;
-        }
-    }
-    protected ArrayList<NetworkMessageEventListenerWithIntent> _NetworkMessageEventListeners = new ArrayList<>();
 
+    private final NetworkMessageEventListenerManager _eventManager;
     /**
      * 
      * @param level message importance
@@ -171,6 +151,7 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
         this._title = null;
         this._socketAddress = null;
         this._serviceToHostMap = null;
+        this._eventManager = null;
     }
     
     /**
@@ -188,6 +169,7 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
         this._title = title;
         this._socketAddress = socketAddress;
         this._serviceToHostMap = serviceToHostMap;
+        this._eventManager = new NetworkMessageEventListenerManager();
         if (LOGGER == null) // do not replace existing logger reference
             LOGGER = logger;
     }
@@ -268,7 +250,7 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
                 this._connectionCount++;
 
             // send a queued message
-            NetworkMessage temp =  this.dequeueMessage();
+            NetworkMessage temp =  this.sendMessage();
             if (temp != null) {
                 if (!this.serverSend(temp)) {
                     logMessages.clear();
@@ -296,7 +278,16 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
         return this._connectionCount;
     }
 
-
+    /**
+     * Removes a message from a queue of pending messages.
+     *
+     * This method is called on the Worker Thread by the doInBackground() main loop.
+     * 
+     * Sub-classes that have the ability to transmit messages should implement this fully.
+     *
+     * @return a message to be sent
+     */
+    protected abstract NetworkMessage sendMessage();
     /**
      * Open the socket ready for accepting data or connections.
      * 
@@ -340,20 +331,13 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
      */
     public abstract boolean serverListen();
 
-    /**
-     * Removes a message from the queue of pending messages.
-     *
-     * This method is called on the Worker Thread by the doInBackground() main loop.
-     *
-     * @return a message to be sent
-     */
-    protected NetworkMessage dequeueMessage() {
-        return this._sendMessageQueue.poll();
-    }
-    
     /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
 
 
+    public NetworkMessageEventListenerManager getEventManager() {
+        return this._eventManager;
+    }
+    
     /**
      * Fetch messages received by the server.
      * 
@@ -365,7 +349,7 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
     @Override
     protected void process(List<NetworkMessage> list) {
         for (NetworkMessage message: list) {
-            fireNetworkMessageEvent(message);
+            this._eventManager.fireNetworkMessageEvent(message);
         }
     }
 
@@ -376,86 +360,4 @@ public abstract class NetworkServerAbstract extends SwingWorker<Integer, Network
      */
     @Override
     protected abstract void done();
-
-    /**
-     * Adds a message to the queue of pending messages.
-     * 
-     * This method will usually be called from the Owner Thread.
-     * 
-     * @param message to be sent
-     * @return true if the message was added to the queue
-     * @throws IllegalArgumentException if the target does not exist in the serviceToHost mapping
-     */
-    public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
-        boolean result = false;
-        if (message != null) {
-            // ensure the target is set and is a valid service
-            String target = message.getTarget();
-            if (target == null)
-                throw new IllegalArgumentException("target cannot be null");
-            if(!_serviceToHostMap.isServiceValid(target))
-                throw new IllegalArgumentException("target service does not exist: " + target);
-            
-            NetworkMessage temp;
-            try { // make a deep clone of the message
-                temp = NetworkMessage.clone(message);
-                result = this._sendMessageQueue.add(temp);
-            } catch (CloneNotSupportedException e) {
-                // TODO: queueMessage() log CloneNotSupportedException
-                e.printStackTrace();
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Add a NetworkMessageEvent listener.
-     * 
-     * Listens to all intents.
-     * 
-     * @param listener 
-     */
-    @Override
-    public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener) {
-        _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, null));
-    }
-
-    /**
-     * Add a filtered NetworkMessageEvent listener.
-     * 
-     * Filters on the intent of the NetworkMessage.
-     * @param listener
-     * @param intent null to listen to all intents, otherwise the intent to listen for
-     */
-    @Override
-    public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener, String intent) {
-        _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, intent));        
-    }
-
-    /**
-     * Remove a NetworkMessageEvent listener.
-     * 
-     * @param listener 
-     */
-    @Override
-    public synchronized void removeNetworkMessageEventListener(NetworkMessageEventListener listener) {
-        for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners)
-            if (intentListener._listener == listener)
-                _NetworkMessageEventListeners.remove(intentListener);
-    }
-    
-    /**
-     * Send a NetworkMessageEvent to all listeners.
-     * 
-     * Only sends the message to listeners registered for the same intent, or for all messages.
-     * 
-     * @param message the NetworkMessage to send
-     */
-    private synchronized void fireNetworkMessageEvent(NetworkMessage message) {
-        NetworkMessageEvent event = new NetworkMessageEvent(this, message);
-        for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners) {
-            if (intentListener._intent.equals(message._intent) || intentListener._intent == null)
-                intentListener._listener.NetworkMessageReceived(event);
-        }
-    }
 }
index 710e443..e68987f 100644 (file)
@@ -30,8 +30,11 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import uk.ac.ntu.n0521366.wsyd.libs.message.MessageNetworkStream;
 
 /**
  * Dual-use multithreading network TCP server that can be used stand-alone
@@ -45,6 +48,8 @@ public class NetworkServerTCP extends NetworkServerAbstract {
     ServerSocket _serverSocket;
     
     NetworkStreamManager _streamManager;
+    
+    protected ArrayList<ConnectionEstablishedEventListener> _ConnectionEstablishedEventListeners;
 
      /**
      * Construct the server with a Logger.
@@ -60,6 +65,7 @@ public class NetworkServerTCP extends NetworkServerAbstract {
     public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager, Logger logger) {
         super(socketAddress, title, serviceToHostMap, logger);
         this._streamManager = manager;
+        this._ConnectionEstablishedEventListeners = new ArrayList<>();
     }
 
     /**
@@ -149,8 +155,17 @@ public class NetworkServerTCP extends NetworkServerAbstract {
             //System.err.println("Before");
             Socket connectionSocket = _serverSocket.accept();
             NetworkStream newStream = new NetworkStream(connectionSocket, _streamManager);
-            _streamManager.addStream(0, newStream);
+            long userID = _streamManager.addStream(0, newStream);
+            // workaround to enable firing a ConnectionEstablishedEvent in process() on the owner thread
+            NetworkMessage temp = new NetworkMessage("triggerConnectionEstablishedEvent", null, new MessageNetworkStream(newStream));
+            publish(temp);
             log(Level.INFO, _title, MessageFormat.format("Incoming connection from {0}:{1}", connectionSocket.getInetAddress().getCanonicalHostName(), Integer.toString(connectionSocket.getPort())));
+            
+            // add or update the last-seen time of the Sender host in the known services map
+            ServiceAddressMap.LastSeenHost host = new ServiceAddressMap.LastSeenHost((InetSocketAddress)connectionSocket.getRemoteSocketAddress());
+            this._serviceToHostMap.put(Long.toString(userID), host);
+            log(Level.INFO, _title, MessageFormat.format("Added \"{0}\" to service map", userID));
+            System.err.println("Added new TCP stream successfully");
         }
         catch (SocketTimeoutException e) {
             //Nothing to be done
@@ -163,6 +178,26 @@ public class NetworkServerTCP extends NetworkServerAbstract {
         return false;
     }
 
+    private void fireConnectionEstablishedEvent(NetworkStream stream) {
+        ConnectionEstablishedEvent event = new ConnectionEstablishedEvent(this, stream);
+        for (ConnectionEstablishedEventListener listener: this._ConnectionEstablishedEventListeners) {
+            listener.connectionEstablished(event);
+        }
+        
+    }
+    public synchronized void addConnectionEstablisedEventListener(ConnectionEstablishedEventListener listener) {
+        _ConnectionEstablishedEventListeners.add(listener);
+    }
+    
+    public synchronized void removeConnectionEstablishedEventListener(ConnectionEstablishedEventListener listener) {
+        for (ConnectionEstablishedEventListener el : _ConnectionEstablishedEventListeners)
+            if (el == listener) {
+                _ConnectionEstablishedEventListeners.remove(listener);
+                break;
+            }
+    }
+    
+    
     /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
     
     /**
@@ -174,4 +209,20 @@ public class NetworkServerTCP extends NetworkServerAbstract {
     protected  void done() {
         
     }
+
+    @Override
+    protected NetworkMessage sendMessage() {
+        return null;
+    }
+    
+    @Override
+    protected void process(List<NetworkMessage> list) {
+        System.err.println("process()");
+        for (NetworkMessage nm: list) {
+            MessageNetworkStream message = (MessageNetworkStream)nm.getMessage();
+            System.err.println("ConnectionEstablished dispatched for " + message._stream._key);
+            fireConnectionEstablishedEvent(message._stream);
+
+        }
+    }
 }
index 90ca8e6..fda95b5 100644 (file)
@@ -31,6 +31,7 @@ import java.net.DatagramSocket;
 import java.net.DatagramPacket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.logging.LogRecord;
@@ -53,7 +54,15 @@ public class NetworkServerUDP extends NetworkServerAbstract {
      * Maximum size of UDP packet payload
      */
     public static final int UDP_PAYLOAD_SIZE_MAX =  65507;
-    
+
+    /**
+     * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
+     * 
+     * Allows the Owner Thread to submit new messages for sending that the Worker Thread
+     * can safely access.
+     */
+    protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
+   
     /**
      * Construct the server with a Logger.
      * 
@@ -243,6 +252,19 @@ public class NetworkServerUDP extends NetworkServerAbstract {
         return result;
     }
 
+     /**
+     * Removes a message from the queue of pending messages.
+     *
+     * This method is called on the Worker Thread by the doInBackground() main loop.
+     *
+     * @return a message to be sent
+     */
+    @Override
+    protected NetworkMessage sendMessage() {
+        return this._sendMessageQueue.poll();
+    }
+    
+    
     /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
     
     /**
@@ -254,4 +276,35 @@ public class NetworkServerUDP extends NetworkServerAbstract {
     protected  void done() {
         // TODO: done() implement any clean-up after doInBackground() has returned
     }
+
+    /**
+     * Adds a message to the queue of pending messages.
+     * 
+     * This method will usually be called from the Owner Thread.
+     * 
+     * @param message to be sent
+     * @return true if the message was added to the queue
+     * @throws IllegalArgumentException if the target does not exist in the serviceToHost mapping
+     */
+    public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
+        boolean result = false;
+        if (message != null) {
+            // ensure the target is set and is a valid service
+            String target = message.getTarget();
+            if (target == null)
+                throw new IllegalArgumentException("target cannot be null");
+            if(!_serviceToHostMap.isServiceValid(target))
+                throw new IllegalArgumentException("target service does not exist: " + target);
+            
+            NetworkMessage temp;
+            try { // make a deep clone of the message
+                temp = NetworkMessage.clone(message);
+                result = this._sendMessageQueue.add(temp);
+            } catch (CloneNotSupportedException e) {
+                // TODO: queueMessage() log CloneNotSupportedException
+                e.printStackTrace();
+            }
+        }
+        return result;
+    }    
 }
index 3998ddb..4f8bf03 100644 (file)
@@ -97,7 +97,7 @@ public class NetworkServerUDPMulticast extends NetworkServerUDP {
             this._multicastSocket = new MulticastSocket(_socketAddress.getPort());
             this._multicastSocket.setTimeToLive(2); // don't traverse more than 2 routers
             this._multicastSocket.setSoTimeout(100); // 1/10th second blocking timeout on receive()
-            this._multicastSocket.setLoopbackMode(true); // inverted logic; true == disable. Don't want to receive our own sent packets
+            this._multicastSocket.setLoopbackMode(false); // inverted logic; true == disable. Don't want to receive our own sent packets
             
             Enumeration<NetworkInterface> ifs = NetworkInterface.getNetworkInterfaces();
             while (ifs.hasMoreElements()) {
index 641890f..19076a9 100644 (file)
@@ -36,5 +36,5 @@ public interface NetworkSocketClosing {
      * @param key
      * @return 
      */
-    public boolean canCloseSocket(int key);
+    public boolean canCloseSocket(long key);
 }
index fe4c4bb..48da1b7 100644 (file)
@@ -28,8 +28,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.net.Socket;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -45,7 +45,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
     
     static final long longSize = Long.SIZE / 8;
     
-    int _key;
+    long _key;
     
     /**
      * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
@@ -55,6 +55,8 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
     
+    private final NetworkMessageEventListenerManager _eventManager;
+    
     private ObjectOutputStream _socketOS;
     
     private InputStream _socketIS;
@@ -67,9 +69,10 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
     
     private ByteArrayOutputStream _baos;
     
-    NetworkStream(Socket socket, NetworkSocketClosing owner) {
+    public NetworkStream(Socket socket, NetworkSocketClosing owner) {
         this._socket = socket;
         this._owner = owner;
+        _eventManager = new NetworkMessageEventListenerManager();
     }
     
     @Override
@@ -88,19 +91,22 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
         while (!this.isCancelled()) {
             
             // check for incoming message
-            if (_socketIS.available() >= expectedLength) {
+            if (_ois.available() >= expectedLength) {
                 if (expectingLong == true) {                  
                     expectedLength = _ois.readLong();
+                    System.err.println("Expecting Object with length " + expectedLength);
+                    System.err.println("Remaining bytes: " + _ois.available());
                     expectingLong = false;
                 }
-                else {                  
+                else {
+                    System.err.println("About to read at least " + _ois.available() + " bytes");
                     read();
                     expectedLength = longSize;
                     expectingLong = true;
                 }
             }
             // send a queued message
-            NetworkMessage temp = this.dequeueMessage();
+            NetworkMessage temp = this.sendMessage();
             if (temp != null) {
                 if (!this.write(temp)) {
                     System.err.println("Unable to send message over TCP channel");
@@ -128,6 +134,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     public boolean write(NetworkMessage message) {
         boolean result = false;
+        System.err.println("NetworkStream.write()");
         
         if (message != null) {
             try {             
@@ -135,11 +142,15 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
                 _oos.flush();
                 long messageSize = _baos.size();
                 _socketOS.writeLong(messageSize);
+                System.err.println(" bytes to write: " + messageSize);
                 _baos.writeTo(_socketOS);
+                _socketOS.flush();
+                System.err.println("baos.size()=" + _baos.size());
                 
                 result = true;
                 
             } catch (IOException ex) {
+                ex.printStackTrace();
                 // TODO: Implement graceful disconnect
             }
         }
@@ -154,9 +165,18 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      */
     public boolean read() {
         boolean result = false;
+        System.err.println("NetworkStream.read()");
         
         try {
-            NetworkMessage message = (NetworkMessage)_ois.readObject();
+            NetworkMessage message = null;
+
+            try {
+                message = (NetworkMessage)_ois.readObject();
+            } catch (java.io.OptionalDataException ex) {
+                System.err.println("Length: " + ex.length + " EOF: " + ex.eof);
+                ex.printStackTrace();
+            }
+            message.setKey(_key);
             publish(message);
             
             result = true;
@@ -179,7 +199,7 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
      *
      * @return a message to be sent
      */
-    protected NetworkMessage dequeueMessage() {
+    protected NetworkMessage sendMessage() {
         return this._sendMessageQueue.poll();
     }
     
@@ -208,5 +228,24 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
         }
         return result;
     }
+
+    public NetworkMessageEventListenerManager getEventManager() {
+        return this._eventManager;
+    }
+    
+    /**
+     * Fetch messages received over the stream.
+     * 
+     * For delivery to event listeners; usually Swing GUI components. This method will run on the
+     * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
+     * 
+     * @param list messages received and queued
+     */
+    @Override
+    protected void process(List<NetworkMessage> list) {
+        for (NetworkMessage message: list) {
+            this._eventManager.fireNetworkMessageEvent(message);
+        }
+    }
     
 }
index dfa02ba..2d131cb 100644 (file)
@@ -32,42 +32,47 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class NetworkStreamManager implements NetworkSocketClosing {
     
-    static final int SERVERSOCIAL = -98;
+    public static final long SERVERSOCIAL = -98;
     
-    static final int SERVERCHAT = -99;
+    public static final long SERVERCHAT = -99;
     
-    static final int TEMPCLIENT = -200;
+    static final long TEMPCLIENT = -200;
     
     Random random;
     
     /**
      *
      */
-    public ConcurrentHashMap<Integer, NetworkStream> _tcpStreams;
+    public ConcurrentHashMap<Long, NetworkStream> _tcpStreams;
     
     public NetworkStreamManager() {
         random = new Random();
-        _tcpStreams = new ConcurrentHashMap<Integer, NetworkStream>();
+        _tcpStreams = new ConcurrentHashMap<Long, NetworkStream>();
     }
     
-    public int addStream(int userID, NetworkStream netStream) {
-        int result = 0;
+    public long addStream(long userID, NetworkStream netStream) {
+        long result = 0;
         
         if (netStream != null) {
             if (!_tcpStreams.containsKey(userID)) {
                 if (userID == 0) {
-                    int tempKey;
+                    long tempKey;
                     do {
                         tempKey = createTempKey();
                     } while (_tcpStreams.containsKey(tempKey));
                     _tcpStreams.put(tempKey, netStream);
                     result = tempKey;
+                    netStream._key = tempKey;
+                    netStream.execute();
                     System.err.println("Added new stream with key: " + tempKey);
                 }
                 else {
                     if ((userID < 0 && userID > TEMPCLIENT) || userID > 0) {
                         _tcpStreams.put(userID, netStream);
                         result = userID;
+                        netStream._key = userID;
+                        netStream.execute();
+                        System.err.println("Added new stream with predefined key: " + userID);
                     }
                 }
             }
@@ -76,12 +81,15 @@ public class NetworkStreamManager implements NetworkSocketClosing {
         return result;
     }
     
-    public boolean updateKey(int oldID, int newID) {
+    public boolean updateKey(long oldID, long newID) {
         boolean result = false;
         
         if (!_tcpStreams.containsKey(newID)) {
             if (_tcpStreams.containsKey(oldID)) {
-                _tcpStreams.put(newID, _tcpStreams.get(oldID));
+                NetworkStream temp = _tcpStreams.get(oldID);
+                temp._key = newID;
+                _tcpStreams.put(newID, temp);
+                System.err.println("Added new stream with key: " + newID);
                 _tcpStreams.remove(oldID);
                 
                 result = true;
@@ -91,18 +99,18 @@ public class NetworkStreamManager implements NetworkSocketClosing {
         return result;
     }
     
-    private int createTempKey() {
+    private long createTempKey() {
         return TEMPCLIENT + -random.nextInt(1000000);
     }
 
     @Override
-    public boolean canCloseSocket(int key) {
+    public boolean canCloseSocket(long key) {
         boolean result = false;
         
         if (_tcpStreams.containsKey(key)) {
             _tcpStreams.remove(key);
             result = true;
-            System.err.println("Removed key: " + key);
+            System.err.println("Removed stream with key: " + key);
         }
         return result;
     }