View Javadoc

1   /*
2    * Copyright (c) 2005-2007 Creative Sphere Limited.
3    * All rights reserved. This program and the accompanying materials
4    * are made available under the terms of the Eclipse Public License v1.0
5    * which accompanies this distribution, and is available at
6    * http://www.eclipse.org/legal/epl-v10.html
7    *
8    * Contributors:
9    *
10   *   Creative Sphere - initial API and implementation
11   *
12   */
13  package org.abstracthorizon.danube.service.server;
14  
15  import java.io.IOException;
16  import java.net.InetSocketAddress;
17  import java.net.Socket;
18  import java.nio.channels.ServerSocketChannel;
19  import java.nio.channels.SocketChannel;
20  import java.util.HashSet;
21  import java.util.Set;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  
26  import org.abstracthorizon.danube.connection.Connection;
27  import org.abstracthorizon.danube.connection.ConnectionHandler;
28  import org.abstracthorizon.danube.service.Service;
29  import org.abstracthorizon.danube.service.ServiceException;
30  import org.abstracthorizon.danube.service.ServiceNotificationListener;
31  
32  /**
33   * This is server socket service. This service accepts connections from a
34   * server socket, creates new {@link ConnectionHandlerThread} objects
35   * and executes them in a given {@link Executor}.
36   *
37   * @author Daniel Sendula
38   */
39  public class ServerSocketChannelService extends Service {
40  
41      /** Reference to the server socket */
42      protected ServerSocketChannel serverSocketChannel;
43  
44      /** Port this service is going to listen on */
45      protected int port = -1;
46  
47      /** Server socket timeout */
48      protected int serverSocketTimeout = 1000;
49  
50      /** New socket timeout */
51      protected int newSocketTimeout = 60000;
52  
53      /** Executor (thread pool) to be used */
54      protected Executor executor;
55  
56      /** Set of active connections */
57      protected Set<ConnectionHandlerThread> activeConnections = new HashSet<ConnectionHandlerThread>();
58  
59      /** Connection handler new connection to be handed with */
60      protected ConnectionHandler connectionHandler;
61  
62      /** Grace period for connections to finish after service state changes to STOPPING */
63      protected int graceFinishPeriod = 2000;
64  
65      /**
66       * Default constructor
67       */
68      public ServerSocketChannelService() {
69      }
70  
71      /**
72       * Returns the port service is expecting connections on
73       * @return the port
74       */
75      public int getPort() {
76          return port;
77      }
78  
79      /**
80       * Sets the port. It has to be set before {@link #create()} method is called.
81       * @param port the port
82       */
83      public void setPort(int port) {
84          this.port = port;
85      }
86  
87      /**
88       * Returns initial socket timeout
89       * @return initial socket timeout
90       */
91      public int getServerSocketTimeout() {
92          return serverSocketTimeout;
93      }
94  
95      /**
96       * Sets initial socket timeout
97       * @param socketTimeout initial socket timeout
98       */
99      public void setServerSocketTimeout(int socketTimeout) {
100         this.serverSocketTimeout = socketTimeout;
101     }
102 
103     /**
104      * Returns new socket timeout
105      * @return new socket timeout
106      */
107     public int getNewSocketTimeout() {
108         return newSocketTimeout;
109     }
110 
111     /**
112      * Sets new socket timeout
113      * @param socketTimeout new socket timeout
114      */
115     public void setNewSocketTimeout(int socketTimeout) {
116         this.newSocketTimeout = socketTimeout;
117     }
118 
119     /**
120      * Returns grace finish period
121      * @return grace finish period
122      */
123     public int getGraceFinishPeriod() {
124         return graceFinishPeriod;
125     }
126 
127     /**
128      * Sets grace finish period
129      * @param graceFinishPeriod grace finish period
130      */
131     public void setGraceFinishPeriod(int graceFinishPeriod) {
132         this.graceFinishPeriod = graceFinishPeriod;
133     }
134 
135     /**
136      * Return the executor which is used or connections to be handled with
137      * @return the executor
138      */
139     public Executor getExecutor() {
140         if (executor == null) {
141             executor = Executors.newCachedThreadPool();
142         }
143         return executor;
144     }
145 
146     /**
147      * Sets the executor for connections to be handled with
148      * @param executor
149      */
150     public void setExecutor(Executor executor) {
151         this.executor = executor;
152     }
153 
154     /**
155      * Returns connection handler connections are handed to.
156      * @return connection handler
157      */
158     public ConnectionHandler getConnectionHandler() {
159         return connectionHandler;
160     }
161 
162     /**
163      * Sets connection handler
164      * @param connectionHandler connection handler
165      */
166     public void setConnectionHandler(ConnectionHandler connectionHandler) {
167         this.connectionHandler = connectionHandler;
168     }
169 
170     /**
171      * Creates the socket
172      * @throws ServiceException
173      */
174     public void create() throws ServiceException {
175         super.create();
176         serverSocketChannel = createServerSocket();
177     }
178 
179     /**
180      * Starts the service
181      * @throws ServiceException
182      */
183     public void start() throws ServiceException {
184         super.start();
185         Executor executor = getExecutor();
186         executor.execute(this);
187     }
188 
189     /**
190      * Stops the service.
191      * @throws ServiceException
192      */
193     public void stop() throws ServiceException {
194         ServiceNotificationListener listener = new ServiceNotificationListener() {
195 
196             public void serviceAboutToChangeState(Service service, int newState) {
197             }
198 
199             public synchronized void serviceChangedState(Service service, int oldState) {
200                 if (getState() == STOPPED) {
201                     notifyAll();
202                 }
203             }
204         };
205         addListener(listener);
206         synchronized (listener) {
207             super.stop();
208             try {
209                 listener.wait(graceFinishPeriod);
210             } catch (InterruptedException ignore) {
211             }
212         }
213         removeListener(listener);
214         for (ConnectionHandlerThread thread : activeConnections) {
215             Connection socketConnection = thread.getConnection();
216             try {
217                 SocketChannel socketChannel = (SocketChannel)socketConnection.adapt(SocketChannel.class);
218                 socketChannel.close();
219 
220                 Socket socket = (Socket)socketConnection.adapt(Socket.class);
221 //                socket.shutdownInput();
222 //                socket.shutdownOutput();
223 //                socket.close();
224                 socket.setSoTimeout(10);
225                 synchronized (socket) { // TODO
226                     socket.notifyAll();
227                 }
228             } catch (Exception ignore) {
229                 // TODO remove
230                 ignore.printStackTrace();
231             }
232             try {
233                 thread.getThread().interrupt();
234             } catch (Exception ignore) {
235             }
236         }
237 
238         if (executor instanceof ExecutorService) {
239             ((ExecutorService)executor).shutdownNow();
240         }
241     }
242 
243     /**
244      * Closes the server socket
245      * @throws ServiceException
246      */
247     public void destroy() throws ServiceException {
248         super.destroy();
249         try {
250             serverSocketChannel.close();
251         } catch (IOException e) {
252             throw new ServiceException("Problem closing server socket", e);
253         }
254     }
255 
256     /**
257      * Accepts connections from server socket and calls {@link #processConnection(Socket)} method
258      */
259     public void run() {
260         try {
261             changeState(RUNNING);
262             while (!stopService) {
263                 try {
264                     SocketChannel socket = serverSocketChannel.accept();
265                     processConnection(socket);
266                 } catch (IOException ignore) {
267                 }
268             }
269         } finally {
270             changeState(STOPPED);
271         }
272     }
273 
274 
275     /**
276      * Creates server socket
277      * @return server socket
278      * @throws ServiceException
279      */
280     protected ServerSocketChannel createServerSocket()throws ServiceException {
281         try {
282             ServerSocketChannel serverSocket = ServerSocketChannel.open();
283             serverSocket.configureBlocking(true);
284             serverSocket.socket().bind(new InetSocketAddress(port));
285             serverSocket.socket().setSoTimeout(getServerSocketTimeout());
286             return serverSocket;
287         } catch (IOException e) {
288             throw new ServiceException("Problem creating server socket", e);
289         }
290     }
291 
292     /**
293      * Creates new socket connection
294      * @param socket socket
295      * @return socketChannel socket channel
296      * @throws IOException
297      * @throws Exception
298      */
299     protected Connection createSocketConnection(SocketChannel socketChannel) throws IOException {
300         Connection socketConnection = new SocketChannelConnection(socketChannel);
301         return socketConnection;
302     }
303 
304     /**
305      * Creates socket connection and new instance of {@link ConnectionHandlerThread} to process
306      * socket under the given executor
307      * @param socket socket
308      */
309     protected void processConnection(SocketChannel socket) {
310         if (logger.isDebugEnabled()) { logger.debug("Accepted new connection;  " + socket.toString()); }
311 
312         try {
313             socket.socket().setSoTimeout(getNewSocketTimeout());
314             Connection socketConnection = createSocketConnection(socket);
315             ConnectionHandlerThread connectionHandlerThread = new ConnectionHandlerThread(socketConnection);
316             connectionHandlerThread.start();
317         } catch (Exception e) {
318             logger.error("Cannot process connection for socket; " + socket, e);
319         }
320     }
321 
322     /**
323      * This class is executed in under the given executor. It serves to keep
324      * reference to socket connection and enclosing {@link ServerSocketChannelService} instance.
325      */
326     public class ConnectionHandlerThread implements Runnable {
327 
328         /** Socket connection */
329         protected Connection socketConnection;
330 
331         /** Current thread */
332         protected Thread thread;
333 
334         /**
335          * Constructor
336          * @param socketConnection socket connection
337          */
338         public ConnectionHandlerThread(Connection socketConnection) {
339             this.socketConnection = socketConnection;
340         }
341 
342         /**
343          * Returns socket connection
344          * @return socket connection
345          */
346         public Connection getConnection() {
347             return socketConnection;
348         }
349 
350         /**
351          * Returns thread
352          * @return thread
353          */
354         public Thread getThread() {
355             return thread;
356         }
357 
358         /**
359          * Gives this object to the executor for execution
360          */
361         public void start() {
362             executor.execute(this);
363         }
364 
365         /**
366          * Handles connection invoking {@link ConnectionHandler#handleConnection(org.abstracthorizon.danube.connection.Connection)} method.
367          */
368         public void run() {
369             try {
370                 thread = Thread.currentThread();
371                 activeConnections.add(this);
372                 connectionHandler.handleConnection(socketConnection);
373             } catch (Exception e) {
374                 logger.error("Connection finished with error; " + socketConnection, e);
375             } finally {
376                 activeConnections.remove(this);
377                 try {
378                     ((Socket)socketConnection.adapt(Socket.class)).close();
379                 } catch (Exception ignore) {
380                 }
381             }
382         }
383     }
384 }