1
2
3
4
5
6
7
8
9
10
11
12
13 package org.abstracthorizon.danube.service.server;
14
15 import org.abstracthorizon.danube.connection.Connection;
16 import org.abstracthorizon.danube.connection.ConnectionHandler;
17 import org.abstracthorizon.danube.service.Service;
18 import org.abstracthorizon.danube.service.ServiceException;
19 import org.abstracthorizon.danube.service.ServiceNotificationListener;
20
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26
27
28
29
30
31
32
33
34
35 public abstract class MultiThreadServerService extends ServerService {
36
37
38 protected int graceFinishPeriod = 2000;
39
40
41 protected Executor executor;
42
43
44 protected Set<ConnectionHandlerThread> activeConnections = new HashSet<ConnectionHandlerThread>();
45
46
47
48
49 public MultiThreadServerService() {
50 }
51
52
53
54
55
56 public int getGraceFinishPeriod() {
57 return graceFinishPeriod;
58 }
59
60
61
62
63
64 public void setGraceFinishPeriod(int graceFinishPeriod) {
65 this.graceFinishPeriod = graceFinishPeriod;
66 }
67
68
69
70
71
72 public Executor getExecutor() {
73 if (executor == null) {
74 executor = Executors.newCachedThreadPool();
75 }
76 return executor;
77 }
78
79
80
81
82
83 public void setExecutor(Executor executor) {
84 this.executor = executor;
85 }
86
87
88
89
90
91 public Set<ConnectionHandlerThread> getActiveConnections() {
92 return activeConnections;
93 }
94
95
96
97
98
99 public int getNumberOfActiveConnections() {
100 if (activeConnections != null) {
101 return activeConnections.size();
102 } else {
103 return 0;
104 }
105 }
106
107
108
109
110
111 public void start() throws ServiceException {
112 super.start();
113 Executor executor = getExecutor();
114 executor.execute(this);
115 }
116
117
118
119
120
121 public void stop() throws ServiceException {
122 ServiceNotificationListener listener = new ServiceNotificationListener() {
123
124 public void serviceAboutToChangeState(Service service, int newState) {
125 }
126
127 public synchronized void serviceChangedState(Service service, int oldState) {
128 if (getState() == STOPPED) {
129 notifyAll();
130 }
131 }
132 };
133 addListener(listener);
134 synchronized (listener) {
135 super.stop();
136 try {
137 listener.wait(graceFinishPeriod);
138 } catch (InterruptedException ignore) {
139 }
140 }
141 removeListener(listener);
142
143 for (ConnectionHandlerThread thread : activeConnections) {
144 Connection serverConnection = thread.getConnection();
145 if (!serverConnection.isClosed()) {
146 serverConnection.close();
147 }
148 try {
149 thread.getThread().interrupt();
150 } catch (Exception ignore) {
151 }
152 }
153
154 if (executor instanceof ExecutorService) {
155 ((ExecutorService)executor).shutdownNow();
156 }
157 }
158
159
160
161
162 public void run() {
163 try {
164 changeState(RUNNING);
165 if (logger.isInfoEnabled()) {
166 logger.info("Started service " + getName() + " on the address " + getAddress() + ":" + getPort());
167 }
168
169 while (!stopService) {
170 processConnections();
171 }
172 } finally {
173 changeState(STOPPED);
174 }
175 }
176
177
178
179
180 protected abstract void processConnections();
181
182
183
184
185
186 protected class ConnectionHandlerThread implements Runnable {
187
188
189 protected Connection serverConnection;
190
191
192 protected Thread thread;
193
194
195
196
197
198 public ConnectionHandlerThread(Connection serverConnection) {
199 this.serverConnection = serverConnection;
200 }
201
202
203
204
205
206 public Connection getConnection() {
207 return serverConnection;
208 }
209
210
211
212
213
214 public Thread getThread() {
215 return thread;
216 }
217
218
219
220
221 public void start() {
222 executor.execute(this);
223 }
224
225
226
227
228 public void run() {
229 try {
230 thread = Thread.currentThread();
231 activeConnections.add(this);
232 connectionHandler.handleConnection(serverConnection);
233 } catch (Exception e) {
234 logger.error("Connection finished with error; " + serverConnection, e);
235 } finally {
236 activeConnections.remove(this);
237 serverConnection.close();
238 }
239 }
240 }
241 }