import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.net.Socket;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
static final long longSize = Long.SIZE / 8;
- int _key;
+ long _key;
/**
* Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
*/
protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
+ private final NetworkMessageEventListenerManager _eventManager;
+
private ObjectOutputStream _socketOS;
private InputStream _socketIS;
private ByteArrayOutputStream _baos;
- NetworkStream(Socket socket, NetworkSocketClosing owner) {
+ public NetworkStream(Socket socket, NetworkSocketClosing owner) {
this._socket = socket;
this._owner = owner;
+ _eventManager = new NetworkMessageEventListenerManager();
}
@Override
while (!this.isCancelled()) {
// check for incoming message
- if (_socketIS.available() >= expectedLength) {
+ if (_ois.available() >= expectedLength) {
if (expectingLong == true) {
expectedLength = _ois.readLong();
+ System.err.println("Expecting Object with length " + expectedLength);
+ System.err.println("Remaining bytes: " + _ois.available());
expectingLong = false;
}
- else {
+ else {
+ System.err.println("About to read at least " + _ois.available() + " bytes");
read();
expectedLength = longSize;
expectingLong = true;
}
}
// send a queued message
- NetworkMessage temp = this.dequeueMessage();
+ NetworkMessage temp = this.sendMessage();
if (temp != null) {
if (!this.write(temp)) {
System.err.println("Unable to send message over TCP channel");
*/
public boolean write(NetworkMessage message) {
boolean result = false;
+ System.err.println("NetworkStream.write()");
if (message != null) {
try {
_oos.flush();
long messageSize = _baos.size();
_socketOS.writeLong(messageSize);
+ System.err.println(" bytes to write: " + messageSize);
_baos.writeTo(_socketOS);
+ _socketOS.flush();
+ System.err.println("baos.size()=" + _baos.size());
result = true;
} catch (IOException ex) {
+ ex.printStackTrace();
// TODO: Implement graceful disconnect
}
}
*/
public boolean read() {
boolean result = false;
+ System.err.println("NetworkStream.read()");
try {
- NetworkMessage message = (NetworkMessage)_ois.readObject();
+ NetworkMessage message = null;
+
+ try {
+ message = (NetworkMessage)_ois.readObject();
+ } catch (java.io.OptionalDataException ex) {
+ System.err.println("Length: " + ex.length + " EOF: " + ex.eof);
+ ex.printStackTrace();
+ }
+ message.setKey(_key);
publish(message);
result = true;
*
* @return a message to be sent
*/
- protected NetworkMessage dequeueMessage() {
+ protected NetworkMessage sendMessage() {
return this._sendMessageQueue.poll();
}
}
return result;
}
+
+ public NetworkMessageEventListenerManager getEventManager() {
+ return this._eventManager;
+ }
+
+ /**
+ * Fetch messages received over the stream.
+ *
+ * For delivery to event listeners; usually Swing GUI components. This method will run on the
+ * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
+ *
+ * @param list messages received and queued
+ */
+ @Override
+ protected void process(List<NetworkMessage> list) {
+ for (NetworkMessage message: list) {
+ this._eventManager.fireNetworkMessageEvent(message);
+ }
+ }
}