1
2
3
4
5
6
7
8
9
10
11
12
13 package org.abstracthorizon.danube.service.server;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.net.Socket;
18 import java.nio.channels.ServerSocketChannel;
19 import java.nio.channels.SocketChannel;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25
26 import org.abstracthorizon.danube.connection.Connection;
27 import org.abstracthorizon.danube.connection.ConnectionHandler;
28 import org.abstracthorizon.danube.service.Service;
29 import org.abstracthorizon.danube.service.ServiceException;
30 import org.abstracthorizon.danube.service.ServiceNotificationListener;
31
32
33
34
35
36
37
38
39 public class ServerSocketChannelService extends Service {
40
41
42 protected ServerSocketChannel serverSocketChannel;
43
44
45 protected int port = -1;
46
47
48 protected int serverSocketTimeout = 1000;
49
50
51 protected int newSocketTimeout = 60000;
52
53
54 protected Executor executor;
55
56
57 protected Set<ConnectionHandlerThread> activeConnections = new HashSet<ConnectionHandlerThread>();
58
59
60 protected ConnectionHandler connectionHandler;
61
62
63 protected int graceFinishPeriod = 2000;
64
65
66
67
68 public ServerSocketChannelService() {
69 }
70
71
72
73
74
75 public int getPort() {
76 return port;
77 }
78
79
80
81
82
83 public void setPort(int port) {
84 this.port = port;
85 }
86
87
88
89
90
91 public int getServerSocketTimeout() {
92 return serverSocketTimeout;
93 }
94
95
96
97
98
99 public void setServerSocketTimeout(int socketTimeout) {
100 this.serverSocketTimeout = socketTimeout;
101 }
102
103
104
105
106
107 public int getNewSocketTimeout() {
108 return newSocketTimeout;
109 }
110
111
112
113
114
115 public void setNewSocketTimeout(int socketTimeout) {
116 this.newSocketTimeout = socketTimeout;
117 }
118
119
120
121
122
123 public int getGraceFinishPeriod() {
124 return graceFinishPeriod;
125 }
126
127
128
129
130
131 public void setGraceFinishPeriod(int graceFinishPeriod) {
132 this.graceFinishPeriod = graceFinishPeriod;
133 }
134
135
136
137
138
139 public Executor getExecutor() {
140 if (executor == null) {
141 executor = Executors.newCachedThreadPool();
142 }
143 return executor;
144 }
145
146
147
148
149
150 public void setExecutor(Executor executor) {
151 this.executor = executor;
152 }
153
154
155
156
157
158 public ConnectionHandler getConnectionHandler() {
159 return connectionHandler;
160 }
161
162
163
164
165
166 public void setConnectionHandler(ConnectionHandler connectionHandler) {
167 this.connectionHandler = connectionHandler;
168 }
169
170
171
172
173
174 public void create() throws ServiceException {
175 super.create();
176 serverSocketChannel = createServerSocket();
177 }
178
179
180
181
182
183 public void start() throws ServiceException {
184 super.start();
185 Executor executor = getExecutor();
186 executor.execute(this);
187 }
188
189
190
191
192
193 public void stop() throws ServiceException {
194 ServiceNotificationListener listener = new ServiceNotificationListener() {
195
196 public void serviceAboutToChangeState(Service service, int newState) {
197 }
198
199 public synchronized void serviceChangedState(Service service, int oldState) {
200 if (getState() == STOPPED) {
201 notifyAll();
202 }
203 }
204 };
205 addListener(listener);
206 synchronized (listener) {
207 super.stop();
208 try {
209 listener.wait(graceFinishPeriod);
210 } catch (InterruptedException ignore) {
211 }
212 }
213 removeListener(listener);
214 for (ConnectionHandlerThread thread : activeConnections) {
215 Connection socketConnection = thread.getConnection();
216 try {
217 SocketChannel socketChannel = (SocketChannel)socketConnection.adapt(SocketChannel.class);
218 socketChannel.close();
219
220 Socket socket = (Socket)socketConnection.adapt(Socket.class);
221
222
223
224 socket.setSoTimeout(10);
225 synchronized (socket) {
226 socket.notifyAll();
227 }
228 } catch (Exception ignore) {
229
230 ignore.printStackTrace();
231 }
232 try {
233 thread.getThread().interrupt();
234 } catch (Exception ignore) {
235 }
236 }
237
238 if (executor instanceof ExecutorService) {
239 ((ExecutorService)executor).shutdownNow();
240 }
241 }
242
243
244
245
246
247 public void destroy() throws ServiceException {
248 super.destroy();
249 try {
250 serverSocketChannel.close();
251 } catch (IOException e) {
252 throw new ServiceException("Problem closing server socket", e);
253 }
254 }
255
256
257
258
259 public void run() {
260 try {
261 changeState(RUNNING);
262 while (!stopService) {
263 try {
264 SocketChannel socket = serverSocketChannel.accept();
265 processConnection(socket);
266 } catch (IOException ignore) {
267 }
268 }
269 } finally {
270 changeState(STOPPED);
271 }
272 }
273
274
275
276
277
278
279
280 protected ServerSocketChannel createServerSocket()throws ServiceException {
281 try {
282 ServerSocketChannel serverSocket = ServerSocketChannel.open();
283 serverSocket.configureBlocking(true);
284 serverSocket.socket().bind(new InetSocketAddress(port));
285 serverSocket.socket().setSoTimeout(getServerSocketTimeout());
286 return serverSocket;
287 } catch (IOException e) {
288 throw new ServiceException("Problem creating server socket", e);
289 }
290 }
291
292
293
294
295
296
297
298
299 protected Connection createSocketConnection(SocketChannel socketChannel) throws IOException {
300 Connection socketConnection = new SocketChannelConnection(socketChannel);
301 return socketConnection;
302 }
303
304
305
306
307
308
309 protected void processConnection(SocketChannel socket) {
310 if (logger.isDebugEnabled()) { logger.debug("Accepted new connection; " + socket.toString()); }
311
312 try {
313 socket.socket().setSoTimeout(getNewSocketTimeout());
314 Connection socketConnection = createSocketConnection(socket);
315 ConnectionHandlerThread connectionHandlerThread = new ConnectionHandlerThread(socketConnection);
316 connectionHandlerThread.start();
317 } catch (Exception e) {
318 logger.error("Cannot process connection for socket; " + socket, e);
319 }
320 }
321
322
323
324
325
326 public class ConnectionHandlerThread implements Runnable {
327
328
329 protected Connection socketConnection;
330
331
332 protected Thread thread;
333
334
335
336
337
338 public ConnectionHandlerThread(Connection socketConnection) {
339 this.socketConnection = socketConnection;
340 }
341
342
343
344
345
346 public Connection getConnection() {
347 return socketConnection;
348 }
349
350
351
352
353
354 public Thread getThread() {
355 return thread;
356 }
357
358
359
360
361 public void start() {
362 executor.execute(this);
363 }
364
365
366
367
368 public void run() {
369 try {
370 thread = Thread.currentThread();
371 activeConnections.add(this);
372 connectionHandler.handleConnection(socketConnection);
373 } catch (Exception e) {
374 logger.error("Connection finished with error; " + socketConnection, e);
375 } finally {
376 activeConnections.remove(this);
377 try {
378 ((Socket)socketConnection.adapt(Socket.class)).close();
379 } catch (Exception ignore) {
380 }
381 }
382 }
383 }
384 }