From af7e72ed604ded5f9dc6f78c89aa3ec69ab55935 Mon Sep 17 00:00:00 2001 From: Eddie Date: Tue, 2 Jun 2015 16:59:43 +0100 Subject: [PATCH] libs.net: complete network functionality --- .../libs/net/ConnectionEstablishedEvent.java | 45 ++++++ .../ConnectionEstablishedEventListener.java | 32 ++++ .../wsyd/libs/net/NetworkMessage.java | 25 +++- .../net/NetworkMessageEventGenerator.java | 10 +- .../NetworkMessageEventListenerManager.java | 106 ++++++++++++++ .../wsyd/libs/net/NetworkServerAbstract.java | 138 +++--------------- .../wsyd/libs/net/NetworkServerTCP.java | 53 ++++++- .../wsyd/libs/net/NetworkServerUDP.java | 55 ++++++- .../libs/net/NetworkServerUDPMulticast.java | 2 +- .../wsyd/libs/net/NetworkSocketClosing.java | 2 +- .../n0521366/wsyd/libs/net/NetworkStream.java | 55 ++++++- .../wsyd/libs/net/NetworkStreamManager.java | 34 +++-- 12 files changed, 412 insertions(+), 145 deletions(-) create mode 100644 src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java create mode 100644 src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java create mode 100644 src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java new file mode 100644 index 0000000..9035a2c --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEvent.java @@ -0,0 +1,45 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +import java.util.EventObject; + +/** + * + * @author eddie + */ +public class ConnectionEstablishedEvent extends EventObject { + + private final NetworkStream _stream; + + public ConnectionEstablishedEvent(Object source, NetworkStream stream) { + super(source); + _stream = stream; + } + + public NetworkStream getStream() { + return _stream; + } + +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java new file mode 100644 index 0000000..f099f2f --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/ConnectionEstablishedEventListener.java @@ -0,0 +1,32 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +/** + * + * @author eddie + */ +public interface ConnectionEstablishedEventListener { + public void connectionEstablished(ConnectionEstablishedEvent event); +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessage.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessage.java index a3e9dd3..a513e72 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessage.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessage.java @@ -58,6 +58,11 @@ public class NetworkMessage implements Serializable, Cloneable { * Filled in by the message originator with the title of the destination service */ String _serviceTarget; + + /** + * Optional user key of a NetworkStream to enable TCP to send userID on connection + */ + long _key; Class _class; MessageAbstract _message; @@ -83,7 +88,7 @@ public class NetworkMessage implements Serializable, Cloneable { * contain native types that are serializable * @throws IllegalArgumentException */ - NetworkMessage(String intent, String target, MessageAbstract message) throws IllegalArgumentException { + public NetworkMessage(String intent, String target, MessageAbstract message) throws IllegalArgumentException { _serializeLength = -1; if ( !(intent != null && intent.length() > 0) ) throw(new IllegalArgumentException("intent cannot be null or empty")); @@ -166,6 +171,24 @@ public class NetworkMessage implements Serializable, Cloneable { return _serializeLength; } + /** + * Set the key of the TCP stream in the messsage. + * + * @param key + */ + public void setKey(long key) { + _key = key; + } + + /** + * Get the key set by the ServerTCP. + * + * @return the key of the stream associated + */ + public long getKey() { + return _key; + } + /** * Create a message for passing over the network. * diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventGenerator.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventGenerator.java index 400230e..66ed1c2 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventGenerator.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventGenerator.java @@ -29,6 +29,7 @@ package uk.ac.ntu.n0521366.wsyd.libs.net; * @see NetworkMessage * @see NetworkMessageEvent * @see NetworkMessageEventListener + * @see NetworkMessageEventListenerManager * @author TJ */ public interface NetworkMessageEventGenerator { @@ -53,5 +54,12 @@ public interface NetworkMessageEventGenerator { * * @param listener */ - public void removeNetworkMessageEventListener(NetworkMessageEventListener listener); + public void removeNetworkMessageEventListener(NetworkMessageEventListener listener); + + /** + * Sends message to all registered listeners. + * + * @param message + */ + public void fireNetworkMessageEvent(NetworkMessage message); } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java new file mode 100644 index 0000000..7c18a2c --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkMessageEventListenerManager.java @@ -0,0 +1,106 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +import java.util.ArrayList; + +/** + * + * @author eddie + */ +public class NetworkMessageEventListenerManager implements NetworkMessageEventGenerator { + /** + * Wrapper allowing filtering of NetworkMessageEvents based on message intent. + */ + public class NetworkMessageEventListenerWithIntent { + String _intent; + NetworkMessageEventListener _listener; + + public NetworkMessageEventListenerWithIntent(NetworkMessageEventListener listener, String intent) { + _intent = intent; + _listener = listener; + } + } + protected ArrayList _NetworkMessageEventListeners; + + /** + * Construct an empty list of Event Listeners. + */ + NetworkMessageEventListenerManager() { + _NetworkMessageEventListeners = new ArrayList<>(); + } + + /** + * Add a NetworkMessageEvent listener. + * + * Listens to all intents. + * + * @param listener + */ + @Override + public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener) { + _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, null)); + } + + /** + * Add a filtered NetworkMessageEvent listener. + * + * Filters on the intent of the NetworkMessage. + * + * @param listener + * @param intent null to listen to all intents, otherwise the intent to listen for + */ + @Override + public synchronized void addNetworkMessageEventListener(NetworkMessageEventListener listener, String intent) { + _NetworkMessageEventListeners.add(new NetworkMessageEventListenerWithIntent(listener, intent)); + } + + /** + * Remove a NetworkMessageEvent listener. + * + * @param listener + */ + @Override + public synchronized void removeNetworkMessageEventListener(NetworkMessageEventListener listener) { + for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners) + if (intentListener._listener == listener) + _NetworkMessageEventListeners.remove(intentListener); + } + + /** + * Send a NetworkMessageEvent to all listeners. + * + * Only sends the message to listeners registered for the same intent, or for all messages. + * + * @param message the NetworkMessage to send + */ + @Override + public synchronized void fireNetworkMessageEvent(NetworkMessage message) { + NetworkMessageEvent event = new NetworkMessageEvent(this, message); + for (NetworkMessageEventListenerWithIntent intentListener : _NetworkMessageEventListeners) { + if (intentListener._intent.equals(message._intent) || intentListener._intent == null) + intentListener._listener.NetworkMessageReceived(event); + } + } +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java index b8eaae5..3a866df 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java @@ -65,7 +65,7 @@ import javax.swing.SwingWorker; * * @author TJ */ -public abstract class NetworkServerAbstract extends SwingWorker implements NetworkMessageEventGenerator { +public abstract class NetworkServerAbstract extends SwingWorker { /** * Single Logger for the class used by all object instances. @@ -100,28 +100,8 @@ public abstract class NetworkServerAbstract extends SwingWorker _sendMessageQueue = new ConcurrentLinkedQueue<>(); - - /** - * Wrapper for filtering NetworkMessageEvents based on the message intent - */ - public class NetworkMessageEventListenerWithIntent { - String _intent; - NetworkMessageEventListener _listener; - - public NetworkMessageEventListenerWithIntent(NetworkMessageEventListener listener, String intent) { - _intent = intent; - _listener = listener; - } - } - protected ArrayList _NetworkMessageEventListeners = new ArrayList<>(); + private final NetworkMessageEventListenerManager _eventManager; /** * * @param level message importance @@ -171,6 +151,7 @@ public abstract class NetworkServerAbstract extends SwingWorker list) { for (NetworkMessage message: list) { - fireNetworkMessageEvent(message); + this._eventManager.fireNetworkMessageEvent(message); } } @@ -376,86 +360,4 @@ public abstract class NetworkServerAbstract extends SwingWorker _ConnectionEstablishedEventListeners; /** * Construct the server with a Logger. @@ -60,6 +65,7 @@ public class NetworkServerTCP extends NetworkServerAbstract { public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager, Logger logger) { super(socketAddress, title, serviceToHostMap, logger); this._streamManager = manager; + this._ConnectionEstablishedEventListeners = new ArrayList<>(); } /** @@ -149,8 +155,17 @@ public class NetworkServerTCP extends NetworkServerAbstract { //System.err.println("Before"); Socket connectionSocket = _serverSocket.accept(); NetworkStream newStream = new NetworkStream(connectionSocket, _streamManager); - _streamManager.addStream(0, newStream); + long userID = _streamManager.addStream(0, newStream); + // workaround to enable firing a ConnectionEstablishedEvent in process() on the owner thread + NetworkMessage temp = new NetworkMessage("triggerConnectionEstablishedEvent", null, new MessageNetworkStream(newStream)); + publish(temp); log(Level.INFO, _title, MessageFormat.format("Incoming connection from {0}:{1}", connectionSocket.getInetAddress().getCanonicalHostName(), Integer.toString(connectionSocket.getPort()))); + + // add or update the last-seen time of the Sender host in the known services map + ServiceAddressMap.LastSeenHost host = new ServiceAddressMap.LastSeenHost((InetSocketAddress)connectionSocket.getRemoteSocketAddress()); + this._serviceToHostMap.put(Long.toString(userID), host); + log(Level.INFO, _title, MessageFormat.format("Added \"{0}\" to service map", userID)); + System.err.println("Added new TCP stream successfully"); } catch (SocketTimeoutException e) { //Nothing to be done @@ -163,6 +178,26 @@ public class NetworkServerTCP extends NetworkServerAbstract { return false; } + private void fireConnectionEstablishedEvent(NetworkStream stream) { + ConnectionEstablishedEvent event = new ConnectionEstablishedEvent(this, stream); + for (ConnectionEstablishedEventListener listener: this._ConnectionEstablishedEventListeners) { + listener.connectionEstablished(event); + } + + } + public synchronized void addConnectionEstablisedEventListener(ConnectionEstablishedEventListener listener) { + _ConnectionEstablishedEventListeners.add(listener); + } + + public synchronized void removeConnectionEstablishedEventListener(ConnectionEstablishedEventListener listener) { + for (ConnectionEstablishedEventListener el : _ConnectionEstablishedEventListeners) + if (el == listener) { + _ConnectionEstablishedEventListeners.remove(listener); + break; + } + } + + /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */ /** @@ -174,4 +209,20 @@ public class NetworkServerTCP extends NetworkServerAbstract { protected void done() { } + + @Override + protected NetworkMessage sendMessage() { + return null; + } + + @Override + protected void process(List list) { + System.err.println("process()"); + for (NetworkMessage nm: list) { + MessageNetworkStream message = (MessageNetworkStream)nm.getMessage(); + System.err.println("ConnectionEstablished dispatched for " + message._stream._key); + fireConnectionEstablishedEvent(message._stream); + + } + } } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDP.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDP.java index 90ca8e6..fda95b5 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDP.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDP.java @@ -31,6 +31,7 @@ import java.net.DatagramSocket; import java.net.DatagramPacket; import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; import java.util.logging.LogRecord; @@ -53,7 +54,15 @@ public class NetworkServerUDP extends NetworkServerAbstract { * Maximum size of UDP packet payload */ public static final int UDP_PAYLOAD_SIZE_MAX = 65507; - + + /** + * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent. + * + * Allows the Owner Thread to submit new messages for sending that the Worker Thread + * can safely access. + */ + protected ConcurrentLinkedQueue _sendMessageQueue = new ConcurrentLinkedQueue<>(); + /** * Construct the server with a Logger. * @@ -243,6 +252,19 @@ public class NetworkServerUDP extends NetworkServerAbstract { return result; } + /** + * Removes a message from the queue of pending messages. + * + * This method is called on the Worker Thread by the doInBackground() main loop. + * + * @return a message to be sent + */ + @Override + protected NetworkMessage sendMessage() { + return this._sendMessageQueue.poll(); + } + + /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */ /** @@ -254,4 +276,35 @@ public class NetworkServerUDP extends NetworkServerAbstract { protected void done() { // TODO: done() implement any clean-up after doInBackground() has returned } + + /** + * Adds a message to the queue of pending messages. + * + * This method will usually be called from the Owner Thread. + * + * @param message to be sent + * @return true if the message was added to the queue + * @throws IllegalArgumentException if the target does not exist in the serviceToHost mapping + */ + public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException { + boolean result = false; + if (message != null) { + // ensure the target is set and is a valid service + String target = message.getTarget(); + if (target == null) + throw new IllegalArgumentException("target cannot be null"); + if(!_serviceToHostMap.isServiceValid(target)) + throw new IllegalArgumentException("target service does not exist: " + target); + + NetworkMessage temp; + try { // make a deep clone of the message + temp = NetworkMessage.clone(message); + result = this._sendMessageQueue.add(temp); + } catch (CloneNotSupportedException e) { + // TODO: queueMessage() log CloneNotSupportedException + e.printStackTrace(); + } + } + return result; + } } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDPMulticast.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDPMulticast.java index 3998ddb..4f8bf03 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDPMulticast.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerUDPMulticast.java @@ -97,7 +97,7 @@ public class NetworkServerUDPMulticast extends NetworkServerUDP { this._multicastSocket = new MulticastSocket(_socketAddress.getPort()); this._multicastSocket.setTimeToLive(2); // don't traverse more than 2 routers this._multicastSocket.setSoTimeout(100); // 1/10th second blocking timeout on receive() - this._multicastSocket.setLoopbackMode(true); // inverted logic; true == disable. Don't want to receive our own sent packets + this._multicastSocket.setLoopbackMode(false); // inverted logic; true == disable. Don't want to receive our own sent packets Enumeration ifs = NetworkInterface.getNetworkInterfaces(); while (ifs.hasMoreElements()) { diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java index 641890f..19076a9 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java @@ -36,5 +36,5 @@ public interface NetworkSocketClosing { * @param key * @return */ - public boolean canCloseSocket(int key); + public boolean canCloseSocket(long key); } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java index fe4c4bb..48da1b7 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java @@ -28,8 +28,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.OutputStream; import java.net.Socket; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -45,7 +45,7 @@ public class NetworkStream extends SwingWorker { static final long longSize = Long.SIZE / 8; - int _key; + long _key; /** * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent. @@ -55,6 +55,8 @@ public class NetworkStream extends SwingWorker { */ protected ConcurrentLinkedQueue _sendMessageQueue = new ConcurrentLinkedQueue<>(); + private final NetworkMessageEventListenerManager _eventManager; + private ObjectOutputStream _socketOS; private InputStream _socketIS; @@ -67,9 +69,10 @@ public class NetworkStream extends SwingWorker { private ByteArrayOutputStream _baos; - NetworkStream(Socket socket, NetworkSocketClosing owner) { + public NetworkStream(Socket socket, NetworkSocketClosing owner) { this._socket = socket; this._owner = owner; + _eventManager = new NetworkMessageEventListenerManager(); } @Override @@ -88,19 +91,22 @@ public class NetworkStream extends SwingWorker { while (!this.isCancelled()) { // check for incoming message - if (_socketIS.available() >= expectedLength) { + if (_ois.available() >= expectedLength) { if (expectingLong == true) { expectedLength = _ois.readLong(); + System.err.println("Expecting Object with length " + expectedLength); + System.err.println("Remaining bytes: " + _ois.available()); expectingLong = false; } - else { + else { + System.err.println("About to read at least " + _ois.available() + " bytes"); read(); expectedLength = longSize; expectingLong = true; } } // send a queued message - NetworkMessage temp = this.dequeueMessage(); + NetworkMessage temp = this.sendMessage(); if (temp != null) { if (!this.write(temp)) { System.err.println("Unable to send message over TCP channel"); @@ -128,6 +134,7 @@ public class NetworkStream extends SwingWorker { */ public boolean write(NetworkMessage message) { boolean result = false; + System.err.println("NetworkStream.write()"); if (message != null) { try { @@ -135,11 +142,15 @@ public class NetworkStream extends SwingWorker { _oos.flush(); long messageSize = _baos.size(); _socketOS.writeLong(messageSize); + System.err.println(" bytes to write: " + messageSize); _baos.writeTo(_socketOS); + _socketOS.flush(); + System.err.println("baos.size()=" + _baos.size()); result = true; } catch (IOException ex) { + ex.printStackTrace(); // TODO: Implement graceful disconnect } } @@ -154,9 +165,18 @@ public class NetworkStream extends SwingWorker { */ public boolean read() { boolean result = false; + System.err.println("NetworkStream.read()"); try { - NetworkMessage message = (NetworkMessage)_ois.readObject(); + NetworkMessage message = null; + + try { + message = (NetworkMessage)_ois.readObject(); + } catch (java.io.OptionalDataException ex) { + System.err.println("Length: " + ex.length + " EOF: " + ex.eof); + ex.printStackTrace(); + } + message.setKey(_key); publish(message); result = true; @@ -179,7 +199,7 @@ public class NetworkStream extends SwingWorker { * * @return a message to be sent */ - protected NetworkMessage dequeueMessage() { + protected NetworkMessage sendMessage() { return this._sendMessageQueue.poll(); } @@ -208,5 +228,24 @@ public class NetworkStream extends SwingWorker { } return result; } + + public NetworkMessageEventListenerManager getEventManager() { + return this._eventManager; + } + + /** + * Fetch messages received over the stream. + * + * For delivery to event listeners; usually Swing GUI components. This method will run on the + * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread. + * + * @param list messages received and queued + */ + @Override + protected void process(List list) { + for (NetworkMessage message: list) { + this._eventManager.fireNetworkMessageEvent(message); + } + } } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java index dfa02ba..2d131cb 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java @@ -32,42 +32,47 @@ import java.util.concurrent.ConcurrentHashMap; */ public class NetworkStreamManager implements NetworkSocketClosing { - static final int SERVERSOCIAL = -98; + public static final long SERVERSOCIAL = -98; - static final int SERVERCHAT = -99; + public static final long SERVERCHAT = -99; - static final int TEMPCLIENT = -200; + static final long TEMPCLIENT = -200; Random random; /** * */ - public ConcurrentHashMap _tcpStreams; + public ConcurrentHashMap _tcpStreams; public NetworkStreamManager() { random = new Random(); - _tcpStreams = new ConcurrentHashMap(); + _tcpStreams = new ConcurrentHashMap(); } - public int addStream(int userID, NetworkStream netStream) { - int result = 0; + public long addStream(long userID, NetworkStream netStream) { + long result = 0; if (netStream != null) { if (!_tcpStreams.containsKey(userID)) { if (userID == 0) { - int tempKey; + long tempKey; do { tempKey = createTempKey(); } while (_tcpStreams.containsKey(tempKey)); _tcpStreams.put(tempKey, netStream); result = tempKey; + netStream._key = tempKey; + netStream.execute(); System.err.println("Added new stream with key: " + tempKey); } else { if ((userID < 0 && userID > TEMPCLIENT) || userID > 0) { _tcpStreams.put(userID, netStream); result = userID; + netStream._key = userID; + netStream.execute(); + System.err.println("Added new stream with predefined key: " + userID); } } } @@ -76,12 +81,15 @@ public class NetworkStreamManager implements NetworkSocketClosing { return result; } - public boolean updateKey(int oldID, int newID) { + public boolean updateKey(long oldID, long newID) { boolean result = false; if (!_tcpStreams.containsKey(newID)) { if (_tcpStreams.containsKey(oldID)) { - _tcpStreams.put(newID, _tcpStreams.get(oldID)); + NetworkStream temp = _tcpStreams.get(oldID); + temp._key = newID; + _tcpStreams.put(newID, temp); + System.err.println("Added new stream with key: " + newID); _tcpStreams.remove(oldID); result = true; @@ -91,18 +99,18 @@ public class NetworkStreamManager implements NetworkSocketClosing { return result; } - private int createTempKey() { + private long createTempKey() { return TEMPCLIENT + -random.nextInt(1000000); } @Override - public boolean canCloseSocket(int key) { + public boolean canCloseSocket(long key) { boolean result = false; if (_tcpStreams.containsKey(key)) { _tcpStreams.remove(key); result = true; - System.err.println("Removed key: " + key); + System.err.println("Removed stream with key: " + key); } return result; } -- 2.17.1