NetworkStream: simplify, add Logger, getRemoteAddress(), better cleanup and closing...
authorEddie <dev@fun2be.me>
Sat, 6 Jun 2015 08:43:34 +0000 (09:43 +0100)
committerEddie <dev@fun2be.me>
Sat, 6 Jun 2015 09:36:54 +0000 (10:36 +0100)
 * simplify object deserialization
 * support using application's Logger instance
 * introduce getRemoteAddress()
 * introduce close() and move close streams to done()
 * fill out javadocs

Simplify object deserialization. Remove the now unused object length
functionality that was originally prefixed to the object in the serialized stream.
This allows the dropping of the indirect streams used to measure the
size of an object.

Logger. Pass in the application Logger instance to the constructor, add
the project-standardised simple logp() method and use it in place of
console error messages.

Introduce getRemoteAddress(). Provide public access to the stream's
remote host address so that NetworkMessage can be tagged with the
sender's address for easy ServiceAddressMap support.

Introduce close(). Provide public access to tell the stream to close
cleanly. Used by the NetworkStreamManager to close all streams cleanly
at shutdown.
Move the stream closing code into done().

src/uk/ac/ntu/n0521366/wsyd/libs/net/NetworkStream.java

index d451e89..48b1559 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * The MIT License
  *
- * Copyright 2015 eddie.
+ * Copyright 2015 Eddie Berrisford-Lynch <dev@fun2be.me>.
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
  */
 package uk.ac.ntu.n0521366.wsyd.libs.net;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
@@ -37,164 +38,188 @@ import javax.swing.SwingWorker;
 
 /**
  *
- * @author eddie
+ * @author Eddie Berrisford-Lynch <dev@fun2be.me>
  */
 public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
 
     private final Socket _socket;
-    
+
     static final long longSize = Long.SIZE / 8;
-    
+
+    /**
+     * Identifies the client userID or server this stream is connected to.
+     */
     long _key;
-    
+
     /**
      * Thread safe First In, First Out Queue of NetworkMessage objects waiting to be sent.
-     * 
+     *
      * Allows the Owner Thread to submit new messages for sending that the Worker Thread
      * can safely access.
      */
     protected ConcurrentLinkedQueue<NetworkMessage> _sendMessageQueue = new ConcurrentLinkedQueue<>();
-    
+
+    /**
+     * Event manager for sending events to interested listeners.
+     */
     private final NetworkMessageEventListenerManager _eventManager;
-    
+
     private ObjectOutputStream _socketOS;
-    
+
     private InputStream _socketIS;
-    
+
+    /**
+     * The object that authorises closing of socket. Usually a NetworkStreamManager.
+     */
     private final NetworkSocketClosing _owner;
-    
+
+    /**
+     * Stream
+     */
     private ObjectInputStream _ois;
-    
-    private ObjectOutputStream _oos;
-    
-    private ByteArrayOutputStream _baos;
-    
-    public NetworkStream(Socket socket, NetworkSocketClosing owner) {
+
+    private Logger LOGGER;
+
+    /**
+     * Construct a new TCP stream handler with a Logger.
+     *
+     * @param socket the connected TCP socket
+     * @param owner the object responsible for permitting the socket to close
+     * @param logger a Logger instance
+     */
+    public NetworkStream(Socket socket, NetworkSocketClosing owner, Logger logger) {
         this._socket = socket;
         this._owner = owner;
         _eventManager = new NetworkMessageEventListenerManager();
+        LOGGER = logger;
     }
-    
+
+    /**
+     * Construct a new TCP stream handler without a Logger.
+     *
+     * @param socket the connected TCP socket
+     * @param owner the object responsible for permitting the socket to close
+     */
+    public NetworkStream(Socket socket, NetworkSocketClosing owner) {
+        this(socket, owner, null);
+    }
+
+    /**
+     * Custom simplified logging function.
+     *
+     * Encapsulates the commonly repeated code of Logger method calls
+     *
+     * @param level the Logger.Level of this message
+     * @param format MessageFormat.format() formatter
+     * @param message zero or more arguments for MessageFormat.format() to process
+     */
+    protected void logp(Level level, String format, Object... message) {
+        if (LOGGER != null)
+            LOGGER.logp(level, LOGGER.getName(), null, MessageFormat.format(MessageFormat.format("TCPstream_{0}", _key, format), (Object[]) message));
+    }
+
+    /**
+     * Listen for incoming messages and send outbound messages.
+     *
+     * @return true if no errors occurred
+     * @throws Exception any exceptions are passed on
+     */
     @Override
     public Boolean doInBackground() throws Exception {
-        
-        _baos = new ByteArrayOutputStream();
-        _oos = new ObjectOutputStream(_baos);
+        boolean result = true;
+
         _socketOS = new ObjectOutputStream(_socket.getOutputStream());
-        
+
         _socketIS = _socket.getInputStream();
         _ois = new ObjectInputStream(_socketIS);
-        
-        long expectedLength = longSize;
-        boolean expectingLong = true;
-        
+
         while (!this.isCancelled()) {
             // check for incoming message
             if (_socketIS.available() > 0) {
-                System.err.println(" reading Object from stream, bytes available: " + _socketIS.available());
                 read();
             }
-            /*
-            if (_ois.available() >= expectedLength) {
-                if (expectingLong == true) {                  
-                    expectedLength = _ois.readLong();
-                    System.err.println("Expecting Object with length " + expectedLength);
-                    System.err.println("Remaining bytes: " + _ois.available());
-                    expectingLong = false;
-                }
-                else {
-                    System.err.println("About to read at least " + _ois.available() + " bytes");
-                    read();
-                    expectedLength = longSize;
-                    expectingLong = true;
-                }
-            }
-            */
             // send a queued message
             NetworkMessage temp = this.sendMessage();
             if (temp != null) {
                 if (!this.write(temp)) {
-                    System.err.println("Unable to send message over TCP channel");
-                    // FIXME: Potentially add logger support
+                    logp(Level.WARNING, "Unable to send message over TCP channel");
+                    result = false; // indicates at least 1 error occurred
                 }
             }
-            
         }
-        return false;
+
+        return result;
     }
-    
+
+    /**
+     * Clean up after background service has finished.
+     *
+     * Ensures the stream Manager removes this stream from its collection.
+     */
     @Override
     protected void done() {
-        _owner.canCloseSocket(_key);
+        try {
+            _owner.canCloseSocket(_key);
+            _ois.close();
+            _socketIS.close();
+            _socketOS.close();
+        } catch (IOException ex) {
+            logp(Level.SEVERE, "Error closing streams: {0}", ex);
+        }
     }
-    
+
     /**
      * Send a NetworkMessage over a NetworkStream.
-     * 
+     *
      * Each message is preceeded by its length as a long to avoid rare, but possible blocking
      * in the reader.
-     * 
+     *
      * @param message
      * @return true if message successfully sent
      */
     public boolean write(NetworkMessage message) {
         boolean result = false;
-        System.err.println("NetworkStream.write()");
-        
+
         if (message != null) {
-            try {             
-                _oos.writeObject(message);
-                _oos.flush();
-                Long messageSize = new Long(_baos.size());
+            try {
                 _socketOS.writeObject(message);
                 _socketOS.flush();
-                System.err.println(" bytes written: " + messageSize);
-                
+
                 result = true;
-                
+
             } catch (IOException ex) {
-                ex.printStackTrace();
-                // TODO: Implement graceful disconnect
+                logp(Level.SEVERE, "write(): ", ex.toString());
+                this.cancel(true);
             }
         }
         return result;
     }
-    
+
     /**
      * Reads NetworkMessage from the incoming stream and publishes it.
-     * 
-     * 
+     *
+     *
      * @return true if message successfully published
      */
     public boolean read() {
         boolean result = false;
-        System.err.println("NetworkStream.read()");
-        
+
         try {
-            NetworkMessage message = null;
+            NetworkMessage message;
             String s = null;
 
             message = (NetworkMessage)_ois.readObject();
-            System.err.println(" received: " + message.toString());
             message.setKey(_key);
             publish(message);
-            
+
             result = true;
-        } catch (java.io.OptionalDataException ex) {
-            System.err.println("Length: " + ex.length + " EOF: " + ex.eof);
-            ex.printStackTrace();
-        } catch (IOException ex) {
-            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
-            // FIXME: Replace logger
-            // TODO: Graceful disconnect
-        } catch (ClassNotFoundException ex) {
-            Logger.getLogger(NetworkStream.class.getName()).log(Level.SEVERE, null, ex);
-            // FIXME: Replace logger
+        } catch (IOException | ClassNotFoundException ex) {
+            logp(Level.SEVERE, "read(): {0}", ex.toString());
+            this.cancel(true);
         }
-        // TODO: Implement NetworkStream::read()
         return result;
     }
-    
+
     /**
      * Removes a message from the queue of pending messages.
      *
@@ -205,43 +230,53 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
     protected NetworkMessage sendMessage() {
         return this._sendMessageQueue.poll();
     }
-    
+
     /* XXX: Methods below here all execute on the GUI Event Dispatch Thread */
-    
+
     /**
      * Adds a message to the queue of pending messages.
-     * 
+     *
      * This method will usually be called from the Owner Thread.
-     * 
+     *
      * @param message to be sent
      * @return true if the message was added to the queue
      */
     public boolean queueMessage(NetworkMessage message) throws IllegalArgumentException {
         boolean result = false;
-        
+
         if (message != null) {
             NetworkMessage temp;
             try { // make a deep clone of the message
                 temp = NetworkMessage.clone(message);
                 result = this._sendMessageQueue.add(temp);
-            } catch (CloneNotSupportedException e) {
-                // TODO: queueMessage() log CloneNotSupportedException
-                e.printStackTrace();
+            } catch (CloneNotSupportedException ex) {
+                logp(Level.SEVERE, "queueMessage(): {0}", ex.toString());
             }
         }
         return result;
     }
 
+    /**
+     * Cause a clean shutdown of the background worker thread.
+     */
+    public void close() {
+        this.cancel(true);
+    }
+
+    /**
+     * Access the stream's event manager.
+     * @return the Manager
+     */
     public NetworkMessageEventListenerManager getEventManager() {
         return this._eventManager;
     }
-    
+
     /**
      * Fetch messages received over the stream.
-     * 
+     *
      * For delivery to event listeners; usually Swing GUI components. This method will run on the
      * Owner Thread so must complete quickly as that is the GUI Event Dispatch Thread.
-     * 
+     *
      * @param list messages received and queued
      */
     @Override
@@ -250,5 +285,13 @@ public class NetworkStream extends SwingWorker<Boolean, NetworkMessage> {
             this._eventManager.fireNetworkMessageEvent(message);
         }
     }
-    
+
+    /**
+     * Get remote IP address:port of this connection.
+     *
+     * @return the IP address:port pair
+     */
+    public InetSocketAddress getRemoteAddress() {
+        return (_socket != null ? (InetSocketAddress) _socket.getRemoteSocketAddress() : null);
+    }
 }