1
2
3
4
5
6
7
8
9
10
11
12 package org.abstracthorizon.danube.proxy.socket;
13
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.io.InputStreamReader;
17 import java.io.OutputStreamWriter;
18 import java.io.PrintWriter;
19 import java.net.InetSocketAddress;
20 import java.net.ServerSocket;
21 import java.net.Socket;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.ServerSocketChannel;
26 import java.nio.channels.SocketChannel;
27 import java.nio.channels.spi.SelectorProvider;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40
41
42
43
44 public class Proxy implements Runnable {
45
46 private static Logger logger = LoggerFactory.getLogger(Proxy.class);
47
48 private InetSocketAddress controlPortAddress;
49
50 private Map<InetSocketAddress, ProxyPort> ports = new HashMap<InetSocketAddress, ProxyPort>();
51
52 private Selector selector;
53
54 private Executor executor;
55
56 private boolean doRun = false;
57
58
59
60
61 public Proxy() {
62
63 }
64
65
66
67
68
69 public Proxy(InetSocketAddress controlPortAddress) {
70 this.controlPortAddress = controlPortAddress;
71 }
72
73 public Proxy(int port) {
74 this.controlPortAddress = new InetSocketAddress(port);
75 }
76
77 public Proxy(String host, int port) {
78 this.controlPortAddress = new InetSocketAddress(host, port);
79 }
80
81 public void setControlPortAddress(InetSocketAddress controlPortAddress) {
82 this.controlPortAddress = controlPortAddress;
83 }
84
85 public InetSocketAddress getControlPortAddress() {
86 return controlPortAddress;
87 }
88
89 public void setExecutor(Executor executor) {
90 this.executor = executor;
91 }
92
93 public Executor getExecutor() {
94 return executor;
95 }
96
97
98
99
100 public synchronized void start() throws IOException {
101 selector = SelectorProvider.provider().openSelector();
102 ControlPort controlPort = new ControlPort(this, selector, controlPortAddress);
103 controlPort.init();
104 Executor executor = getExecutor();
105 if (executor == null) {
106 executor = Executors.newSingleThreadExecutor();
107 }
108 doRun = true;
109 executor.execute(this);
110 }
111
112
113
114
115 public synchronized void stop() {
116 doRun = false;
117 try {
118 wait(1000);
119 } catch (InterruptedException ignore) {
120 }
121 }
122
123 public synchronized void registerInetAddress(InetSocketAddress sourceSocketAddress, InetSocketAddress destinationSocketAddress) throws IOException {
124 ProxyPort serverPort = ports.get(sourceSocketAddress);
125 if (serverPort == null) {
126 serverPort = new ProxyPort(selector, sourceSocketAddress);
127 ports.put(sourceSocketAddress, serverPort);
128 }
129 serverPort.addDestination(destinationSocketAddress);
130 }
131
132 public synchronized void deregisterInetAddress(InetSocketAddress sourceSocketAddress, InetSocketAddress destinationSocketAddress) throws IOException {
133 ProxyPort proxyPort = ports.get(sourceSocketAddress);
134 if (proxyPort != null) {
135 proxyPort.removeDestination(destinationSocketAddress);
136 if (proxyPort.isEmpty()) {
137 ports.remove(sourceSocketAddress);
138 }
139 }
140 }
141
142 public static InetSocketAddress stringToInetSocketAddress(String socketAddress) {
143 int i = socketAddress.indexOf(':');
144 if (i < 0) {
145 throw new IllegalArgumentException("InetSocketAddress string must contain ':'");
146 }
147 String address = socketAddress.substring(0, i);
148 String portString = socketAddress.substring(i + 1);
149 int port = Integer.parseInt(portString);
150 return new InetSocketAddress(address, port);
151 }
152
153
154
155
156 public synchronized void run() {
157 try {
158
159 int keysAdded = selector.select();
160 while (doRun) {
161 Set<SelectionKey> selectedKeys = selector.selectedKeys();
162 if (logger.isDebugEnabled() && selectedKeys.size() == 0) {
163 logger.debug("PROXY : selecting keys..." + keysAdded + "/" + selectedKeys.size() + "(" + Thread.currentThread() + ")");
164 }
165 Iterator<SelectionKey> iterator = selectedKeys.iterator();
166 while (iterator.hasNext()) {
167 SelectionKey key = iterator.next();
168 iterator.remove();
169
170
171 if (!key.isValid()) {
172 Connection c = (Connection)key.attachment();
173 if (logger.isDebugEnabled()) {
174 logger.debug("PROXY : Key is not valid " + c + " closing channel");
175 }
176 c.close(key);
177 } else if (key.isAcceptable()) {
178 ServerSocketChannel channel = (ServerSocketChannel)key.channel();
179 PortProcessor serverPort = (PortProcessor)key.attachment();
180 serverPort.newConnection(channel);
181 } else if (key.isReadable()) {
182 Connection c = (Connection)key.attachment();
183 if (logger.isDebugEnabled()) {
184 logger.debug("PROXY : Key is readable " + c);
185 }
186 c.read(key);
187 } else if (key.isWritable()) {
188 Connection c = (Connection)key.attachment();
189 if (logger.isDebugEnabled()) {
190 logger.debug("PROXY : Key is writable: " + c);
191 }
192 c.write(key);
193 } else if (key.isConnectable()) {
194 SocketChannel socketChannel = (SocketChannel)key.channel();
195 socketChannel.finishConnect();
196 key.interestOps(SelectionKey.OP_READ);
197 } else {
198 Connection c = (Connection)key.attachment();
199 if (logger.isDebugEnabled()) {
200 logger.debug("PROXY : No idea what! " + c);
201 }
202 }
203 }
204
205 keysAdded = selector.select();
206 }
207
208 notifyAll();
209 } catch (Exception e) {
210 e.printStackTrace();
211 }
212 }
213
214 protected static interface PortProcessor {
215
216 void newConnection(ServerSocketChannel channel) throws IOException;
217
218 }
219
220 protected static class ProxyPort implements PortProcessor {
221
222 private Selector selector;
223 private InetSocketAddress socketAddress;
224 private ServerSocketChannel serverSocketChannel;
225 private List<InetSocketAddress> destinations = new ArrayList<InetSocketAddress>();
226 private int current = 0;
227 private int connections = 0;
228
229 public ProxyPort(Selector selector, InetSocketAddress socketAddress) {
230 this.selector = selector;
231 this.socketAddress = socketAddress;
232 }
233
234 public void addDestination(InetSocketAddress destination) throws IOException {
235 if (destinations.isEmpty()) {
236 serverSocketChannel = ServerSocketChannel.open();
237 serverSocketChannel.configureBlocking(false);
238
239 serverSocketChannel.socket().bind(socketAddress);
240
241 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
242 }
243 destinations.add(destination);
244 }
245
246 public void removeDestination(InetSocketAddress destination) throws IOException {
247 destinations.remove(destination);
248 if (current >= destinations.size()) {
249 current = 0;
250 }
251 if (destinations.isEmpty()) {
252 serverSocketChannel.close();
253 }
254 }
255
256 public boolean isEmpty() {
257 return destinations.isEmpty();
258 }
259
260 public synchronized void newConnection(ServerSocketChannel channel) throws IOException {
261 if (destinations.size() > 0) {
262 SocketChannel outboundChannel = SocketChannel.open();
263 outboundChannel.configureBlocking(false);
264
265 InetSocketAddress serverSocketAddress = nextDestination();
266
267 Socket inbound = channel.accept().socket();
268 SocketChannel inboundChannel = inbound.getChannel();
269 inboundChannel.configureBlocking(false);
270
271 Connection outConnection = new ProxyConnection(outboundChannel, inboundChannel, false);
272 Connection inConnection = new ProxyConnection(inboundChannel, outboundChannel, true);
273
274 outboundChannel.register(selector, SelectionKey.OP_CONNECT, outConnection);
275 outboundChannel.connect(serverSocketAddress);
276
277 inboundChannel.register(selector, SelectionKey.OP_READ, inConnection);
278 connections++;
279 } else {
280
281 }
282 }
283
284 public synchronized void closeConnection() {
285 connections = connections - 1;
286 }
287
288 public synchronized InetSocketAddress nextDestination() {
289 if (current >= destinations.size()) {
290 current = 0;
291 }
292 InetSocketAddress ret = destinations.get(current);
293 current++;
294 if (current >= destinations.size()) {
295 current = 0;
296 }
297 return ret;
298 }
299
300 }
301
302 public interface Connection {
303 void read(SelectionKey key) throws IOException;
304 void write(SelectionKey key) throws IOException;
305 void close(SelectionKey key) throws IOException;
306 }
307
308 public static class ProxyConnection implements Connection {
309 private static int counter = 1;
310 private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
311 private SocketChannel in;
312 private SocketChannel out;
313 private boolean inbound;
314 private int count;
315
316 public ProxyConnection(SocketChannel in, SocketChannel out, boolean inbound) {
317 this.in = in;
318 this.out = out;
319 this.inbound = inbound;
320 count = counter;
321 if (inbound) {
322 counter++;
323 }
324 }
325
326 public void read(SelectionKey key) throws IOException {
327 if (out.isConnected()) {
328 buffer.clear();
329 int size = in.read(buffer);
330 if (size == -1) {
331 if (logger.isDebugEnabled()) {
332 logger.debug("PROXY : Closing channel " + this);
333 }
334 key.channel().close();
335 key.cancel();
336 out.close();
337 out.socket().close();
338 } else if (size == 0) {
339 if (logger.isDebugEnabled()) {
340 logger.debug("PROXY : read " + this + " "+ buffer.position() + "? What now?");
341 }
342 } else {
343 if (logger.isDebugEnabled()) {
344 logger.debug("PROXY : read " + this + " " + buffer.position());
345 }
346 buffer.flip();
347 out.write(buffer);
348 }
349 }
350 }
351
352 public void write(SelectionKey key) throws IOException {
353
354 }
355
356 public void close(SelectionKey key) throws IOException {
357 in.close();
358 }
359
360 public String toString() {
361 if (inbound) {
362 return "Inbound(" + count + ")";
363 } else {
364 return "Outbound(" + count + ")";
365 }
366 }
367 }
368
369 protected static class ControlPort implements PortProcessor {
370
371 private Selector selector;
372 private InetSocketAddress socketAddress;
373 private ServerSocketChannel serverSocketChannel;
374 private Proxy proxy;
375
376 public ControlPort(Proxy proxy, Selector selector, InetSocketAddress socketAddress) {
377 this.proxy = proxy;
378 this.selector = selector;
379 this.socketAddress = socketAddress;
380 }
381
382 public void init() throws IOException {
383 serverSocketChannel = ServerSocketChannel.open();
384 serverSocketChannel.configureBlocking(false);
385
386 serverSocketChannel.socket().bind(socketAddress);
387
388 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
389 }
390
391 public void newConnection(ServerSocketChannel channel) throws IOException {
392 Socket socket = channel.accept().socket();
393
394 SocketChannel socketChannel = socket.getChannel();
395 socketChannel.configureBlocking(false);
396
397 Connection connection = new ControlConnection(proxy, socketChannel);
398 socketChannel.register(selector, SelectionKey.OP_READ, connection);
399 }
400
401 }
402
403 protected static class ControlConnection implements Connection {
404
405 private ByteBuffer buffer = ByteBuffer.allocate(1024);
406
407
408 private SocketChannel channel;
409 private InetSocketAddress sourceSocketAddress;
410 private InetSocketAddress destinationSocketAddress;
411 private StringBuffer inputLine = new StringBuffer();
412 private Proxy proxy;
413
414 public ControlConnection(Proxy proxy, SocketChannel channel) {
415 this.proxy = proxy;
416 this.channel = channel;
417
418
419 }
420
421 public void close(SelectionKey key) throws IOException {
422 proxy.deregisterInetAddress(sourceSocketAddress, destinationSocketAddress);
423 }
424
425 public void read(SelectionKey key) throws IOException {
426 buffer.clear();
427 int size = channel.read(buffer);
428 if (destinationSocketAddress == null) {
429 buffer.flip();
430 while (size > 0) {
431 char c = (char)buffer.get();
432 if (c != '\n') {
433 if (c != '\r') {
434 inputLine.append(c);
435 }
436 } else {
437 if (sourceSocketAddress == null) {
438 sourceSocketAddress = stringToInetSocketAddress(inputLine.toString());
439 inputLine = new StringBuffer();
440 } else {
441 destinationSocketAddress = stringToInetSocketAddress(inputLine.toString());
442 proxy.registerInetAddress(sourceSocketAddress, destinationSocketAddress);
443 }
444 }
445 size--;
446 }
447 }
448 }
449
450 public void write(SelectionKey key) throws IOException {
451
452
453 }
454
455 }
456
457 public static void main(String[] args) throws Exception {
458 System.out.println("Creating proxy...");
459 Proxy proxy = new Proxy(8044);
460 proxy.start();
461 System.out.println("Proxy created.");
462
463 Thread.sleep(1000);
464
465 System.out.println("Creating server socket...");
466 ServerSocket serverSocket = new ServerSocket(9999);
467 System.out.println("Server socket created,");
468
469 System.out.println("Creating control socket...");
470 Socket socket = new Socket("localhost", 8044);
471 System.out.println("Control socket created.");
472 PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
473 System.out.println("Sending control data...");
474 out.println("localhost:8888");
475 out.println("localhost:9999");
476 out.flush();
477 System.out.println("Control data sent.");
478 Thread.sleep(1000);
479
480 System.out.println("Creating client socket...");
481 Socket testClientSocket = new Socket("localhost", 8888);
482 System.out.println("Client socket created.");
483 PrintWriter testClientOut = new PrintWriter(new OutputStreamWriter(testClientSocket.getOutputStream()));
484 System.out.println("Sending some data...");
485 testClientOut.println("Something!");
486 testClientOut.flush();
487 System.out.println("Data sent.");
488
489 System.out.println("Accepting connection...");
490 Socket testServerSocket = serverSocket.accept();
491 System.out.println("Connection accepted.");
492 BufferedReader testServerIn = new BufferedReader(new InputStreamReader(testServerSocket.getInputStream()));
493 System.out.println("Receiving line...");
494 String line = testServerIn.readLine();
495 System.out.println("Got line: " + line);
496 System.out.println("Line received.");
497
498 System.exit(0);
499 }
500 }