View Javadoc

1   /*
2    * Copyright (c) 2005-2007 Creative Sphere Limited.
3    * All rights reserved. This program and the accompanying materials
4    * are made available under the terms of the Eclipse Public License v1.0
5    * which accompanies this distribution, and is available at
6    * http://www.eclipse.org/legal/epl-v10.html
7    *
8    * Contributors:
9    *
10   *   Creative Sphere - initial API and implementation
11   *
12   */
13  package org.abstracthorizon.danube.service.server;
14  
15  import org.abstracthorizon.danube.connection.Connection;
16  import org.abstracthorizon.danube.connection.ConnectionHandler;
17  import org.abstracthorizon.danube.service.Service;
18  import org.abstracthorizon.danube.service.ServiceException;
19  import org.abstracthorizon.danube.service.ServiceNotificationListener;
20  
21  import java.util.HashSet;
22  import java.util.Set;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  
27  /**
28   * This class models multi-threaded model of connection service. Each new connection is
29   * spawned in a separate, possibly new, thread obtained from given executor. Connections
30   * passed to the thread must be of {@link ServerConnection} type in order for close method
31   * to be called at the end of processing.
32   *
33   * @author Daniel Sendula
34   */
35  public abstract class MultiThreadServerService extends ServerService {
36  
37      /** Grace period for connections to finish after service state changes to STOPPING */
38      protected int graceFinishPeriod = 2000;
39  
40      /** Executor (thread pool) to be used */
41      protected Executor executor;
42  
43      /** Set of active connections */
44      protected Set<ConnectionHandlerThread> activeConnections = new HashSet<ConnectionHandlerThread>();
45  
46      /**
47       * Default constructor
48       */
49      public MultiThreadServerService() {
50      }
51  
52      /**
53       * Returns grace finish period
54       * @return grace finish period
55       */
56      public int getGraceFinishPeriod() {
57          return graceFinishPeriod;
58      }
59  
60      /**
61       * Sets grace finish period
62       * @param graceFinishPeriod grace finish period
63       */
64      public void setGraceFinishPeriod(int graceFinishPeriod) {
65          this.graceFinishPeriod = graceFinishPeriod;
66      }
67  
68      /**
69       * Return the executor which is used or connections to be handled with
70       * @return the executor
71       */
72      public Executor getExecutor() {
73          if (executor == null) {
74              executor = Executors.newCachedThreadPool();
75          }
76          return executor;
77      }
78  
79      /**
80       * Sets the executor for connections to be handled with
81       * @param executor
82       */
83      public void setExecutor(Executor executor) {
84          this.executor = executor;
85      }
86  
87      /**
88       * Returns active connections
89       * @return connections
90       */
91      public Set<ConnectionHandlerThread> getActiveConnections() {
92          return activeConnections;
93      }
94  
95      /**
96       * Returns number of active connections
97       * @return number of active connections
98       */
99      public int getNumberOfActiveConnections() {
100         if (activeConnections != null) {
101             return activeConnections.size();
102         } else {
103             return 0;
104         }
105     }
106 
107     /**
108      * Starts the service
109      * @throws ServiceException
110      */
111     public void start() throws ServiceException {
112         super.start();
113         Executor executor = getExecutor();
114         executor.execute(this);
115     }
116 
117     /**
118      * Stops the service.
119      * @throws ServiceException
120      */
121     public void stop() throws ServiceException {
122         ServiceNotificationListener listener = new ServiceNotificationListener() {
123 
124             public void serviceAboutToChangeState(Service service, int newState) {
125             }
126 
127             public synchronized void serviceChangedState(Service service, int oldState) {
128                 if (getState() == STOPPED) {
129                     notifyAll();
130                 }
131             }
132         };
133         addListener(listener);
134         synchronized (listener) {
135             super.stop();
136             try {
137                 listener.wait(graceFinishPeriod);
138             } catch (InterruptedException ignore) {
139             }
140         }
141         removeListener(listener);
142         // TODO We have java.util.ConcurrentModificationException on following iteration
143         for (ConnectionHandlerThread thread : activeConnections) {
144             Connection serverConnection = thread.getConnection();
145             if (!serverConnection.isClosed()) {
146                 serverConnection.close();
147             }
148             try {
149                 thread.getThread().interrupt();
150             } catch (Exception ignore) {
151             }
152         }
153 
154         if (executor instanceof ExecutorService) {
155             ((ExecutorService)executor).shutdownNow();
156         }
157     }
158 
159     /**
160      * Accepts connections from processes them
161      */
162     public void run() {
163         try {
164             changeState(RUNNING);
165             if (logger.isInfoEnabled()) {
166                 logger.info("Started service " + getName() + " on the address " + getAddress() + ":" + getPort());
167             }
168 
169             while (!stopService) {
170                 processConnections();
171             }
172         } finally {
173             changeState(STOPPED);
174         }
175     }
176 
177     /**
178      * This method processes connections
179      */
180     protected abstract void processConnections();
181 
182     /**
183      * This class is executed in under the given executor. It serves to keep
184      * reference to server connection and enclosing {@link MultiThreadServerService} instance.
185      */
186     protected class ConnectionHandlerThread implements Runnable {
187 
188         /** Server connection */
189         protected Connection serverConnection;
190 
191         /** Current thread */
192         protected Thread thread;
193 
194         /**
195          * Constructor
196          * @param serverConnection server connection
197          */
198         public ConnectionHandlerThread(Connection serverConnection) {
199             this.serverConnection = serverConnection;
200         }
201 
202         /**
203          * Returns server connection
204          * @return server connection
205          */
206         public Connection getConnection() {
207             return serverConnection;
208         }
209 
210         /**
211          * Returns thread
212          * @return thread
213          */
214         public Thread getThread() {
215             return thread;
216         }
217 
218         /**
219          * Gives this object to the executor for execution
220          */
221         public void start() {
222             executor.execute(this);
223         }
224 
225         /**
226          * Handles connection invoking {@link ConnectionHandler#handleConnection(org.abstracthorizon.danube.connection.Connection)} method.
227          */
228         public void run() {
229             try {
230                 thread = Thread.currentThread();
231                 activeConnections.add(this);
232                 connectionHandler.handleConnection(serverConnection);
233             } catch (Exception e) {
234                 logger.error("Connection finished with error; " + serverConnection, e);
235             } finally {
236                 activeConnections.remove(this);
237                 serverConnection.close();
238             }
239         }
240     }
241 }