From: Eddie Date: Mon, 1 Jun 2015 16:56:56 +0000 (+0100) Subject: Implemented TCP streams for object serialization X-Git-Url: https://iam.tj/gitweb/gitweb.cgi?p=WeStealzYourDataz.git;a=commitdiff_plain;h=ee81eda63d30f2e98fc049cf018c92778218b09b Implemented TCP streams for object serialization --- diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java index 3a186ba..b8eaae5 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkServerAbstract.java @@ -358,7 +358,7 @@ public abstract class NetworkServerAbstract extends SwingWorker */ public class NetworkServerTCP extends NetworkServerAbstract { + ServerSocket _serverSocket; + + NetworkStreamManager _streamManager; + + /** + * Construct the server with a Logger. + * + * No socket is opened. + * + * @param socketAddress The socket to listen on + * @param title source identifier for use in log messages and sent NetworkMessage objects + * @param serviceToHostMap the map object used for host <> InetSocketAddress lookups + * @param manager + * @param logger An instance of Logger to be used by all objects of this class + */ + public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager, Logger logger) { + super(socketAddress, title, serviceToHostMap, logger); + this._streamManager = manager; + } + /** + * Construct the server without a Logger. + * + * No socket is opened. + * + * @param socketAddress The socket to listen on + * @param title source identifier for use in log messages and sent NetworkMessage objects + * @param serviceToHostMap the map object used for host <> InetSocketAddress lookups + * @param manager + */ + public NetworkServerTCP(WSYD_SocketAddress socketAddress, String title, ServiceAddressMap serviceToHostMap, NetworkStreamManager manager) { + super(socketAddress, title, serviceToHostMap); + this._streamManager = manager; + } + /** * Open the socket ready for accepting connections. * @@ -45,7 +86,21 @@ public class NetworkServerTCP extends NetworkServerAbstract { */ @Override public void serverOpen() throws SocketException { + try { + _serverSocket = new ServerSocket(_socketAddress.getPort(), 50, _socketAddress.getAddress()); + } catch (IOException ex) { + throw new SocketException(ex.getMessage()); + } + _serverSocket.setSoTimeout(100); + if (_socketAddress.getPort() == Network.PORTS_EPHEMERAL) { + // reflect the actual port in use if an ephermal port was requested + InetSocketAddress actualSA = (InetSocketAddress)_serverSocket.getLocalSocketAddress(); + _socketAddress.setAddress(actualSA.getAddress()); + _socketAddress.setPort(actualSA.getPort()); + } + //log(Level.FINEST, _title, MessageFormat.format("Connection from {0}:{1}", _socketAddress.getAddress().getCanonicalHostName(), Integer.toString(_socketAddress.getPort()))); + // TODO: Complete this implementation } /** @@ -71,7 +126,12 @@ public class NetworkServerTCP extends NetworkServerAbstract { */ @Override public void serverClose() throws SocketException { - + if (this._serverSocket != null) + try { + this._serverSocket.close(); + } catch (IOException ex) { + throw new SocketException(ex.getMessage()); + } } /** @@ -85,6 +145,21 @@ public class NetworkServerTCP extends NetworkServerAbstract { public boolean serverListen() { boolean result = false; + try { + //System.err.println("Before"); + Socket connectionSocket = _serverSocket.accept(); + NetworkStream newStream = new NetworkStream(connectionSocket, _streamManager); + _streamManager.addStream(0, newStream); + log(Level.INFO, _title, MessageFormat.format("Incoming connection from {0}:{1}", connectionSocket.getInetAddress().getCanonicalHostName(), Integer.toString(connectionSocket.getPort()))); + } + catch (SocketTimeoutException e) { + //Nothing to be done + } + catch (IOException e) { + // Nothing to be done + System.err.println("Incoming connection caused exception..."); + } + return false; } diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java new file mode 100644 index 0000000..641890f --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkSocketClosing.java @@ -0,0 +1,40 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +/** + * + * @author eddie + */ +public interface NetworkSocketClosing { + /** + * Asking parent for permission to close. + * + * Parent will remove stream from its list and stream shall proceed to close. + * + * @param key + * @return + */ + public boolean canCloseSocket(int key); +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java new file mode 100644 index 0000000..fe4c4bb --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java @@ -0,0 +1,212 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.swing.SwingWorker; + +/** + * + * @author eddie + */ +public class NetworkStream extends SwingWorker { + + private final Socket _socket; + + static final long longSize = Long.SIZE / 8; + + int _key; + + /** + * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent. + * + * Allows the Owner Thread to submit new messages for sending that the Worker Thread + * can safely access. + */ + protected ConcurrentLinkedQueue _sendMessageQueue = new ConcurrentLinkedQueue<>(); + + private ObjectOutputStream _socketOS; + + private InputStream _socketIS; + + private final NetworkSocketClosing _owner; + + private ObjectInputStream _ois; + + private ObjectOutputStream _oos; + + private ByteArrayOutputStream _baos; + + NetworkStream(Socket socket, NetworkSocketClosing owner) { + this._socket = socket; + this._owner = owner; + } + + @Override + public Boolean doInBackground() throws Exception { + + _baos = new ByteArrayOutputStream(); + _oos = new ObjectOutputStream(_baos); + _socketOS = new ObjectOutputStream(_socket.getOutputStream()); + + _socketIS = _socket.getInputStream(); + _ois = new ObjectInputStream(_socketIS); + + long expectedLength = longSize; + boolean expectingLong = true; + + while (!this.isCancelled()) { + + // check for incoming message + if (_socketIS.available() >= expectedLength) { + if (expectingLong == true) { + expectedLength = _ois.readLong(); + expectingLong = false; + } + else { + read(); + expectedLength = longSize; + expectingLong = true; + } + } + // send a queued message + NetworkMessage temp = this.dequeueMessage(); + if (temp != null) { + if (!this.write(temp)) { + System.err.println("Unable to send message over TCP channel"); + // FIXME: Potentially add logger support + } + } + + } + return false; + } + + @Override + protected void done() { + _owner.canCloseSocket(_key); + } + + /** + * Send a NetworkMessage over a NetworkStream. + * + * Each message is preceeded by its length as a long to avoid rare, but possible blocking + * in the reader. + * + * @param message + * @return true if message successfully sent + */ + public boolean write(NetworkMessage message) { + boolean result = false; + + if (message != null) { + try { + _oos.writeObject(message); + _oos.flush(); + long messageSize = _baos.size(); + _socketOS.writeLong(messageSize); + _baos.writeTo(_socketOS); + + result = true; + + } catch (IOException ex) { + // TODO: Implement graceful disconnect + } + } + return result; + } + + /** + * Reads NetworkMessage from the incoming stream and publishes it. + * + * + * @return true if message successfully published + */ + public boolean read() { + boolean result = false; + + try { + NetworkMessage message = (NetworkMessage)_ois.readObject(); + publish(message); + + result = true; + } catch (IOException ex) { + Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex); + // FIXME: Replace logger + // TODO: Graceful disconnect + } catch (ClassNotFoundException ex) { + Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex); + // FIXME: Replace logger + } + // TODO: Implement NetworkStream::read() + return result; + } + + /** + * Removes a message from the queue of pending messages. + * + * This method is called on the Worker Thread by the doInBackground() main loop. + * + * @return a message to be sent + */ + protected NetworkMessage dequeueMessage() { + return this._sendMessageQueue.poll(); + } + + /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */ + + /** + * Adds a message to the queue of pending messages. + * + * This method will usually be called from the Owner Thread. + * + * @param message to be sent + * @return true if the message was added to the queue + */ + public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException { + boolean result = false; + + if (message != null) { + NetworkMessage temp; + try { // make a deep clone of the message + temp = NetworkMessage.clone(message); + result = this._sendMessageQueue.add(temp); + } catch (CloneNotSupportedException e) { + // TODO: queueMessage() log CloneNotSupportedException + e.printStackTrace(); + } + } + return result; + } + +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java new file mode 100644 index 0000000..dfa02ba --- /dev/null +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStreamManager.java @@ -0,0 +1,110 @@ +/* + * The MIT License + * + * Copyright 2015 eddie. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package uk.ac.ntu.n0521366.wsyd.libs.net; + +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author eddie + */ +public class NetworkStreamManager implements NetworkSocketClosing { + + static final int SERVERSOCIAL = -98; + + static final int SERVERCHAT = -99; + + static final int TEMPCLIENT = -200; + + Random random; + + /** + * + */ + public ConcurrentHashMap _tcpStreams; + + public NetworkStreamManager() { + random = new Random(); + _tcpStreams = new ConcurrentHashMap(); + } + + public int addStream(int userID, NetworkStream netStream) { + int result = 0; + + if (netStream != null) { + if (!_tcpStreams.containsKey(userID)) { + if (userID == 0) { + int tempKey; + do { + tempKey = createTempKey(); + } while (_tcpStreams.containsKey(tempKey)); + _tcpStreams.put(tempKey, netStream); + result = tempKey; + System.err.println("Added new stream with key: " + tempKey); + } + else { + if ((userID < 0 && userID > TEMPCLIENT) || userID > 0) { + _tcpStreams.put(userID, netStream); + result = userID; + } + } + } + } + + return result; + } + + public boolean updateKey(int oldID, int newID) { + boolean result = false; + + if (!_tcpStreams.containsKey(newID)) { + if (_tcpStreams.containsKey(oldID)) { + _tcpStreams.put(newID, _tcpStreams.get(oldID)); + _tcpStreams.remove(oldID); + + result = true; + } + } + + return result; + } + + private int createTempKey() { + return TEMPCLIENT + -random.nextInt(1000000); + } + + @Override + public boolean canCloseSocket(int key) { + boolean result = false; + + if (_tcpStreams.containsKey(key)) { + _tcpStreams.remove(key); + result = true; + System.err.println("Removed key: " + key); + } + return result; + } + +} diff --git a/src/uk/ac/ntu/n0521366/wsyd/libs/net/WSYD_SocketAddress.java b/src/uk/ac/ntu/n0521366/wsyd/libs/net/WSYD_SocketAddress.java index 70adf4b..e66d60a 100644 --- a/src/uk/ac/ntu/n0521366/wsyd/libs/net/WSYD_SocketAddress.java +++ b/src/uk/ac/ntu/n0521366/wsyd/libs/net/WSYD_SocketAddress.java @@ -63,7 +63,7 @@ public class WSYD_SocketAddress implements java.io.Serializable { throw new IllegalArgumentException(MessageFormat.format("Not a correct dotted decimal notation: {0}", ipv4)); int i = 0; - for (int index = 0; index <=3; index++) { + for (int index = 0; index <= 3; index++) { try { i = Integer.parseInt(elements[index]); } catch (NumberFormatException e) {