/*
* The MIT License
*
- * Copyright 2015 eddie.
+ * Copyright 2015 Eddie Berrisford-Lynch <dev@fun2be.me>.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
*/
package uk.ac.ntu.n0521366.wsyd.libs.net;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.net.InetSocketAddress;
import java.net.Socket;
+import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
/**
*
- * @author eddie
+ * @author Eddie Berrisford-Lynch <dev@fun2be.me>
*/
public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
private final Socket _socket;
-
+
static final long longSize = Long.SIZE / 8;
-
+
+ /**
+ * Identifies the client userID or server this stream is connected to.
+ */
long _key;
-
+
/**
* Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
- *
+ *
* Allows the Owner Thread to submit new messages for sending that the Worker Thread
* can safely access.
*/
protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
-
+
+ /**
+ * Event manager for sending events to interested listeners.
+ */
private final NetworkMessageEventListenerManager _eventManager;
-
+
private ObjectOutputStream _socketOS;
-
+
private InputStream _socketIS;
-
+
+ /**
+ * The object that authorises closing of socket. Usually a NetworkStreamManager.
+ */
private final NetworkSocketClosing _owner;
-
+
+ /**
+ * Stream
+ */
private ObjectInputStream _ois;
-
- private ObjectOutputStream _oos;
-
- private ByteArrayOutputStream _baos;
-
- public NetworkStream(Socket socket, NetworkSocketClosing owner) {
+
+ private Logger LOGGER;
+
+ /**
+ * Construct a new TCP stream handler with a Logger.
+ *
+ * @param socket the connected TCP socket
+ * @param owner the object responsible for permitting the socket to close
+ * @param logger a Logger instance
+ */
+ public NetworkStream(Socket socket, NetworkSocketClosing owner, Logger logger) {
this._socket = socket;
this._owner = owner;
_eventManager = new NetworkMessageEventListenerManager();
+ LOGGER = logger;
}
-
+
+ /**
+ * Construct a new TCP stream handler without a Logger.
+ *
+ * @param socket the connected TCP socket
+ * @param owner the object responsible for permitting the socket to close
+ */
+ public NetworkStream(Socket socket, NetworkSocketClosing owner) {
+ this(socket, owner, null);
+ }
+
+ /**
+ * Custom simplified logging function.
+ *
+ * Encapsulates the commonly repeated code of Logger method calls
+ *
+ * @param level the Logger.Level of this message
+ * @param format MessageFormat.format() formatter
+ * @param message zero or more arguments for MessageFormat.format() to process
+ */
+ protected void logp(Level level, String format, Object... message) {
+ if (LOGGER != null)
+ LOGGER.logp(level, LOGGER.getName(), null, MessageFormat.format(MessageFormat.format("TCPstream_{0}", _key, format), (Object[]) message));
+ }
+
+ /**
+ * Listen for incoming messages and send outbound messages.
+ *
+ * @return true if no errors occurred
+ * @throws Exception any exceptions are passed on
+ */
@Override
public Boolean doInBackground() throws Exception {
-
- _baos = new ByteArrayOutputStream();
- _oos = new ObjectOutputStream(_baos);
+ boolean result = true;
+
_socketOS = new ObjectOutputStream(_socket.getOutputStream());
-
+
_socketIS = _socket.getInputStream();
_ois = new ObjectInputStream(_socketIS);
-
- long expectedLength = longSize;
- boolean expectingLong = true;
-
+
while (!this.isCancelled()) {
// check for incoming message
if (_socketIS.available() > 0) {
- System.err.println(" reading Object from stream, bytes available: " + _socketIS.available());
read();
}
- /*
- if (_ois.available() >= expectedLength) {
- if (expectingLong == true) {
- expectedLength = _ois.readLong();
- System.err.println("Expecting Object with length " + expectedLength);
- System.err.println("Remaining bytes: " + _ois.available());
- expectingLong = false;
- }
- else {
- System.err.println("About to read at least " + _ois.available() + " bytes");
- read();
- expectedLength = longSize;
- expectingLong = true;
- }
- }
- */
// send a queued message
NetworkMessage temp = this.sendMessage();
if (temp != null) {
if (!this.write(temp)) {
- System.err.println("Unable to send message over TCP channel");
- // FIXME: Potentially add logger support
+ logp(Level.WARNING, "Unable to send message over TCP channel");
+ result = false; // indicates at least 1 error occurred
}
}
-
}
- return false;
+
+ return result;
}
-
+
+ /**
+ * Clean up after background service has finished.
+ *
+ * Ensures the stream Manager removes this stream from its collection.
+ */
@Override
protected void done() {
- _owner.canCloseSocket(_key);
+ try {
+ _owner.canCloseSocket(_key);
+ _ois.close();
+ _socketIS.close();
+ _socketOS.close();
+ } catch (IOException ex) {
+ logp(Level.SEVERE, "Error closing streams: {0}", ex);
+ }
}
-
+
/**
* Send a NetworkMessage over a NetworkStream.
- *
+ *
* Each message is preceeded by its length as a long to avoid rare, but possible blocking
* in the reader.
- *
+ *
* @param message
* @return true if message successfully sent
*/
public boolean write(NetworkMessage message) {
boolean result = false;
- System.err.println("NetworkStream.write()");
-
+
if (message != null) {
- try {
- _oos.writeObject(message);
- _oos.flush();
- Long messageSize = new Long(_baos.size());
+ try {
_socketOS.writeObject(message);
_socketOS.flush();
- System.err.println(" bytes written: " + messageSize);
-
+
result = true;
-
+
} catch (IOException ex) {
- ex.printStackTrace();
- // TODO: Implement graceful disconnect
+ logp(Level.SEVERE, "write(): ", ex.toString());
+ this.cancel(true);
}
}
return result;
}
-
+
/**
* Reads NetworkMessage from the incoming stream and publishes it.
- *
- *
+ *
+ *
* @return true if message successfully published
*/
public boolean read() {
boolean result = false;
- System.err.println("NetworkStream.read()");
-
+
try {
- NetworkMessage message = null;
+ NetworkMessage message;
String s = null;
message = (NetworkMessage)_ois.readObject();
- System.err.println(" received: " + message.toString());
message.setKey(_key);
publish(message);
-
+
result = true;
- } catch (java.io.OptionalDataException ex) {
- System.err.println("Length: " + ex.length + " EOF: " + ex.eof);
- ex.printStackTrace();
- } catch (IOException ex) {
- Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
- // FIXME: Replace logger
- // TODO: Graceful disconnect
- } catch (ClassNotFoundException ex) {
- Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
- // FIXME: Replace logger
+ } catch (IOException | ClassNotFoundException ex) {
+ logp(Level.SEVERE, "read(): {0}", ex.toString());
+ this.cancel(true);
}
- // TODO: Implement NetworkStream::read()
return result;
}
-
+
/**
* Removes a message from the queue of pending messages.
*
protected NetworkMessage sendMessage() {
return this._sendMessageQueue.poll();
}
-
+
/* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
-
+
/**
* Adds a message to the queue of pending messages.
- *
+ *
* This method will usually be called from the Owner Thread.
- *
+ *
* @param message to be sent
* @return true if the message was added to the queue
*/
public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
boolean result = false;
-
+
if (message != null) {
NetworkMessage temp;
try { // make a deep clone of the message
temp = NetworkMessage.clone(message);
result = this._sendMessageQueue.add(temp);
- } catch (CloneNotSupportedException e) {
- // TODO: queueMessage() log CloneNotSupportedException
- e.printStackTrace();
+ } catch (CloneNotSupportedException ex) {
+ logp(Level.SEVERE, "queueMessage(): {0}", ex.toString());
}
}
return result;
}
+ /**
+ * Cause a clean shutdown of the background worker thread.
+ */
+ public void close() {
+ this.cancel(true);
+ }
+
+ /**
+ * Access the stream's event manager.
+ * @return the Manager
+ */
public NetworkMessageEventListenerManager getEventManager() {
return this._eventManager;
}
-
+
/**
* Fetch messages received over the stream.
- *
+ *
* For delivery to event listeners; usually Swing GUI components. This method will run on the
* Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
- *
+ *
* @param list messages received and queued
*/
@Override
this._eventManager.fireNetworkMessageEvent(message);
}
}
-
+
+ /**
+ * Get remote IP address:port of this connection.
+ *
+ * @return the IP address:port pair
+ */
+ public InetSocketAddress getRemoteAddress() {
+ return (_socket != null ? (InetSocketAddress) _socket.getRemoteSocketAddress() : null);
+ }
}