--- /dev/null
+/*
+ * The MIT License
+ *
+ * Copyright 2015 eddie.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package uk.ac.ntu.n0521366.wsyd.libs.net;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.swing.SwingWorker;
+
+/**
+ *
+ * @author eddie
+ */
+public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
+
+ private final Socket _socket;
+
+ static final long longSize = Long.SIZE / 8;
+
+ int _key;
+
+ /**
+ * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
+ *
+ * Allows the Owner Thread to submit new messages for sending that the Worker Thread
+ * can safely access.
+ */
+ protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
+
+ private ObjectOutputStream _socketOS;
+
+ private InputStream _socketIS;
+
+ private final NetworkSocketClosing _owner;
+
+ private ObjectInputStream _ois;
+
+ private ObjectOutputStream _oos;
+
+ private ByteArrayOutputStream _baos;
+
+ NetworkStream(Socket socket, NetworkSocketClosing owner) {
+ this._socket = socket;
+ this._owner = owner;
+ }
+
+ @Override
+ public Boolean doInBackground() throws Exception {
+
+ _baos = new ByteArrayOutputStream();
+ _oos = new ObjectOutputStream(_baos);
+ _socketOS = new ObjectOutputStream(_socket.getOutputStream());
+
+ _socketIS = _socket.getInputStream();
+ _ois = new ObjectInputStream(_socketIS);
+
+ long expectedLength = longSize;
+ boolean expectingLong = true;
+
+ while (!this.isCancelled()) {
+
+ // check for incoming message
+ if (_socketIS.available() >= expectedLength) {
+ if (expectingLong == true) {
+ expectedLength = _ois.readLong();
+ expectingLong = false;
+ }
+ else {
+ read();
+ expectedLength = longSize;
+ expectingLong = true;
+ }
+ }
+ // send a queued message
+ NetworkMessage temp = this.dequeueMessage();
+ if (temp != null) {
+ if (!this.write(temp)) {
+ System.err.println("Unable to send message over TCP channel");
+ // FIXME: Potentially add logger support
+ }
+ }
+
+ }
+ return false;
+ }
+
+ @Override
+ protected void done() {
+ _owner.canCloseSocket(_key);
+ }
+
+ /**
+ * Send a NetworkMessage over a NetworkStream.
+ *
+ * Each message is preceeded by its length as a long to avoid rare, but possible blocking
+ * in the reader.
+ *
+ * @param message
+ * @return true if message successfully sent
+ */
+ public boolean write(NetworkMessage message) {
+ boolean result = false;
+
+ if (message != null) {
+ try {
+ _oos.writeObject(message);
+ _oos.flush();
+ long messageSize = _baos.size();
+ _socketOS.writeLong(messageSize);
+ _baos.writeTo(_socketOS);
+
+ result = true;
+
+ } catch (IOException ex) {
+ // TODO: Implement graceful disconnect
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Reads NetworkMessage from the incoming stream and publishes it.
+ *
+ *
+ * @return true if message successfully published
+ */
+ public boolean read() {
+ boolean result = false;
+
+ try {
+ NetworkMessage message = (NetworkMessage)_ois.readObject();
+ publish(message);
+
+ result = true;
+ } catch (IOException ex) {
+ Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+ // FIXME: Replace logger
+ // TODO: Graceful disconnect
+ } catch (ClassNotFoundException ex) {
+ Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
+ // FIXME: Replace logger
+ }
+ // TODO: Implement NetworkStream::read()
+ return result;
+ }
+
+ /**
+ * Removes a message from the queue of pending messages.
+ *
+ * This method is called on the Worker Thread by the doInBackground() main loop.
+ *
+ * @return a message to be sent
+ */
+ protected NetworkMessage dequeueMessage() {
+ return this._sendMessageQueue.poll();
+ }
+
+ /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
+
+ /**
+ * Adds a message to the queue of pending messages.
+ *
+ * This method will usually be called from the Owner Thread.
+ *
+ * @param message to be sent
+ * @return true if the message was added to the queue
+ */
+ public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
+ boolean result = false;
+
+ if (message != null) {
+ NetworkMessage temp;
+ try { // make a deep clone of the message
+ temp = NetworkMessage.clone(message);
+ result = this._sendMessageQueue.add(temp);
+ } catch (CloneNotSupportedException e) {
+ // TODO: queueMessage() log CloneNotSupportedException
+ e.printStackTrace();
+ }
+ }
+ return result;
+ }
+
+}