View Javadoc

1   /*
2    * Copyright (c) 2008 Creative Sphere Limited.
3    * All rights reserved. This program and the accompanying materials
4    * are made available under the terms of the Eclipse Public License v1.0
5    * which accompanies this distribution, and is available at
6    * http://www.eclipse.org/legal/epl-v10.html
7    *
8    * Contributors:
9    *
10   *   Creative Sphere - initial API and implementation
11   *
12   */package org.abstracthorizon.danube.proxy.socket;
13  
14  import java.io.BufferedReader;
15  import java.io.IOException;
16  import java.io.InputStreamReader;
17  import java.io.OutputStreamWriter;
18  import java.io.PrintWriter;
19  import java.net.InetSocketAddress;
20  import java.net.ServerSocket;
21  import java.net.Socket;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.ServerSocketChannel;
26  import java.nio.channels.SocketChannel;
27  import java.nio.channels.spi.SelectorProvider;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Executors;
36  
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  /**
41   * 
42   * @author Daniel Sendula
43   */
44  public class Proxy implements Runnable {
45  
46  	private static Logger logger = LoggerFactory.getLogger(Proxy.class);
47  	
48  	private InetSocketAddress controlPortAddress;
49  	
50  	private Map<InetSocketAddress, ProxyPort> ports = new HashMap<InetSocketAddress, ProxyPort>();
51  	
52      private Selector selector;
53  	
54      private Executor executor;
55      
56      private boolean doRun = false;
57      
58      /**
59       * Constructor.
60       */
61      public Proxy() {
62          
63      }
64  
65      /**
66       * Constructor.
67       * @param controlPortAddress control port address
68       */
69      public Proxy(InetSocketAddress controlPortAddress) {
70          this.controlPortAddress = controlPortAddress;
71      }
72      
73      public Proxy(int port) {
74          this.controlPortAddress = new InetSocketAddress(port);
75      }
76      
77      public Proxy(String host, int port) {
78          this.controlPortAddress = new InetSocketAddress(host, port);
79      }
80      
81      public void setControlPortAddress(InetSocketAddress controlPortAddress) {
82          this.controlPortAddress = controlPortAddress;
83      }
84  
85      public InetSocketAddress getControlPortAddress() {
86          return controlPortAddress;
87      }
88      
89      public void setExecutor(Executor executor) {
90          this.executor = executor;
91      }
92      
93      public Executor getExecutor() {
94          return executor;
95      }
96      	
97  	/**
98  	 * Starts the service.
99  	 */
100 	public synchronized void start() throws IOException {
101         selector = SelectorProvider.provider().openSelector();
102 	    ControlPort controlPort = new ControlPort(this, selector, controlPortAddress);
103 	    controlPort.init();
104         Executor executor = getExecutor();
105         if (executor == null) {
106             executor = Executors.newSingleThreadExecutor();
107         }
108         doRun = true;
109         executor.execute(this);
110 	}
111 	
112 	/**
113 	 * Stops the service.
114 	 */
115 	public synchronized void stop() {
116 	    doRun = false;
117 	    try {
118 	        wait(1000);
119 	    } catch (InterruptedException ignore) {
120 	    }
121 	}
122 	
123 	public synchronized void registerInetAddress(InetSocketAddress sourceSocketAddress, InetSocketAddress destinationSocketAddress) throws IOException {
124 		ProxyPort serverPort = ports.get(sourceSocketAddress);
125 		if (serverPort == null) {
126 			serverPort = new ProxyPort(selector, sourceSocketAddress);
127 			ports.put(sourceSocketAddress, serverPort);
128 		}
129 		serverPort.addDestination(destinationSocketAddress);
130 	}
131 	
132 	public synchronized void deregisterInetAddress(InetSocketAddress sourceSocketAddress, InetSocketAddress destinationSocketAddress) throws IOException {
133         ProxyPort proxyPort = ports.get(sourceSocketAddress);
134         if (proxyPort != null) {
135             proxyPort.removeDestination(destinationSocketAddress);
136             if (proxyPort.isEmpty()) {
137                 ports.remove(sourceSocketAddress);
138             }
139         }
140 	}
141 	
142 	public static InetSocketAddress stringToInetSocketAddress(String socketAddress) {
143 		int i = socketAddress.indexOf(':');
144 		if (i < 0) {
145 			throw new IllegalArgumentException("InetSocketAddress string must contain ':'");
146 		}
147 		String address = socketAddress.substring(0, i);
148 		String portString = socketAddress.substring(i + 1);
149 		int port = Integer.parseInt(portString);
150 		return new InetSocketAddress(address, port);
151 	}
152 	
153 	/**
154 	 * Main method.
155 	 */
156     public synchronized void run() {
157         try {
158 
159             int keysAdded = selector.select();
160             while (doRun) {
161                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
162                 if (logger.isDebugEnabled() && selectedKeys.size() == 0) {
163                     logger.debug("PROXY : selecting keys..." + keysAdded + "/" + selectedKeys.size() + "(" + Thread.currentThread() + ")");
164                 }
165                 Iterator<SelectionKey> iterator = selectedKeys.iterator();
166                 while (iterator.hasNext()) {
167                     SelectionKey key = iterator.next();
168                     iterator.remove();
169 //                    Connection c = (Connection)key.attachment();
170                     
171                     if (!key.isValid()) {
172                         Connection c = (Connection)key.attachment();
173                         if (logger.isDebugEnabled()) {
174                         	logger.debug("PROXY : Key is not valid " + c + " closing channel");
175                         }
176                         c.close(key);
177                     } else if (key.isAcceptable()) {
178                         ServerSocketChannel channel = (ServerSocketChannel)key.channel();
179                         PortProcessor serverPort = (PortProcessor)key.attachment();
180                         serverPort.newConnection(channel);
181                     } else if (key.isReadable()) {
182                         Connection c = (Connection)key.attachment();
183                         if (logger.isDebugEnabled()) {
184                         	logger.debug("PROXY : Key is readable " + c);
185                         }
186                         c.read(key);
187                     } else if (key.isWritable()) {
188                         Connection c = (Connection)key.attachment();
189                         if (logger.isDebugEnabled()) {
190                         	logger.debug("PROXY : Key is writable: " + c);
191                         }
192                         c.write(key);
193                     } else if (key.isConnectable()) {
194                         SocketChannel socketChannel = (SocketChannel)key.channel();
195                         socketChannel.finishConnect();
196                         key.interestOps(SelectionKey.OP_READ);
197                     } else {
198                         Connection c = (Connection)key.attachment();
199                         if (logger.isDebugEnabled()) {
200                         	logger.debug("PROXY : No idea what! " + c);
201                         }
202                     }
203                 }
204                 
205                 keysAdded = selector.select();
206             }
207 
208             notifyAll();
209         } catch (Exception e) {
210             e.printStackTrace();
211         }
212     }
213     
214     protected static interface PortProcessor {
215     	
216     	void newConnection(ServerSocketChannel channel) throws IOException;
217     	
218     }
219     
220     protected static class ProxyPort implements PortProcessor {
221     	
222     	private Selector selector;
223     	private InetSocketAddress socketAddress;
224     	private ServerSocketChannel serverSocketChannel;
225     	private List<InetSocketAddress> destinations = new ArrayList<InetSocketAddress>();
226     	private int current = 0;
227     	private int connections = 0;
228     	
229     	public ProxyPort(Selector selector, InetSocketAddress socketAddress) {
230     		this.selector = selector;
231     		this.socketAddress = socketAddress;
232     	}
233     	
234     	public void addDestination(InetSocketAddress destination) throws IOException {
235     	    if (destinations.isEmpty()) {
236                 serverSocketChannel = ServerSocketChannel.open();
237                 serverSocketChannel.configureBlocking(false);
238 
239                 serverSocketChannel.socket().bind(socketAddress);
240 
241                 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
242     	    }
243     		destinations.add(destination);
244     	}
245     	
246     	public void removeDestination(InetSocketAddress destination) throws IOException {
247     		destinations.remove(destination);
248     		if (current >= destinations.size()) {
249     			current = 0;
250     		}
251     		if (destinations.isEmpty()) {
252     		    serverSocketChannel.close();
253     		}
254     	}
255 
256     	public boolean isEmpty() {
257     	    return destinations.isEmpty();
258     	}
259     	
260     	public synchronized void newConnection(ServerSocketChannel channel) throws IOException {
261     		if (destinations.size() > 0) {
262 	            SocketChannel outboundChannel = SocketChannel.open();
263 	            outboundChannel.configureBlocking(false);
264 
265 	            InetSocketAddress serverSocketAddress = nextDestination();
266 	
267 	            Socket inbound = channel.accept().socket();
268 	            SocketChannel inboundChannel = inbound.getChannel();
269 	            inboundChannel.configureBlocking(false);
270 	            
271 	            Connection outConnection = new ProxyConnection(outboundChannel, inboundChannel, false);
272 	            Connection inConnection = new ProxyConnection(inboundChannel, outboundChannel, true);
273 	            
274 	            outboundChannel.register(selector, SelectionKey.OP_CONNECT, outConnection);
275 	            outboundChannel.connect(serverSocketAddress);
276 	
277 	            inboundChannel.register(selector, SelectionKey.OP_READ, inConnection);
278 	            connections++;
279     		} else {
280     			// TODO
281     		}
282     	}
283     	
284     	public synchronized void closeConnection() {
285     		connections = connections - 1;
286     	}
287     	
288     	public synchronized InetSocketAddress nextDestination() {
289     		if (current >= destinations.size()) {
290     			current = 0;
291     		}
292     		InetSocketAddress ret = destinations.get(current);
293     		current++;
294     		if (current >= destinations.size()) {
295     			current = 0;
296     		}
297     		return ret;
298     	}
299 
300     }
301 
302     public interface Connection {
303         void read(SelectionKey key) throws IOException;
304         void write(SelectionKey key) throws IOException;
305         void close(SelectionKey key) throws IOException;
306     }
307     
308     public static class ProxyConnection implements Connection {
309         private static int counter = 1;
310         private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
311         private SocketChannel in;
312         private SocketChannel out;
313         private boolean inbound;
314         private int count;
315         
316         public ProxyConnection(SocketChannel in, SocketChannel out, boolean inbound) {
317             this.in = in;
318             this.out = out;
319             this.inbound = inbound;
320             count = counter;
321             if (inbound) {
322                 counter++;
323             }
324         }
325         
326         public void read(SelectionKey key) throws IOException {
327             if (out.isConnected()) {
328                 buffer.clear();
329                 int size = in.read(buffer);
330                 if (size == -1) {
331                     if (logger.isDebugEnabled()) {
332                         logger.debug("PROXY : Closing channel " + this);
333                     }
334                     key.channel().close();
335                     key.cancel();
336                     out.close();
337                     out.socket().close();
338                 } else if (size == 0) {
339                     if (logger.isDebugEnabled()) {
340                         logger.debug("PROXY : read " + this + " "+ buffer.position() + "? What now?");
341                     }
342                 } else {
343                     if (logger.isDebugEnabled()) {
344                         logger.debug("PROXY : read " + this + " " + buffer.position());
345                     }
346                     buffer.flip();
347                     out.write(buffer);
348                 }
349             }
350         }
351         
352         public void write(SelectionKey key) throws IOException {
353             
354         }
355  
356         public void close(SelectionKey key) throws IOException {
357             in.close();
358         }
359         
360         public String toString() {
361             if (inbound) {
362                 return "Inbound(" + count + ")";
363             } else {
364                 return "Outbound(" + count + ")";
365             }
366         }
367     }
368     
369     protected static class ControlPort implements PortProcessor {
370 
371         private Selector selector;
372         private InetSocketAddress socketAddress;
373         private ServerSocketChannel serverSocketChannel;
374         private Proxy proxy;
375 
376         public ControlPort(Proxy proxy, Selector selector, InetSocketAddress socketAddress) {
377             this.proxy = proxy;
378             this.selector = selector;
379             this.socketAddress = socketAddress;
380         }
381         
382         public void init() throws IOException {
383             serverSocketChannel = ServerSocketChannel.open();
384             serverSocketChannel.configureBlocking(false);
385 
386             serverSocketChannel.socket().bind(socketAddress);
387 
388             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
389         }
390         
391         public void newConnection(ServerSocketChannel channel) throws IOException {
392             Socket socket = channel.accept().socket();
393 
394             SocketChannel socketChannel = socket.getChannel();
395             socketChannel.configureBlocking(false);
396             
397             Connection connection = new ControlConnection(proxy, socketChannel); 
398             socketChannel.register(selector, SelectionKey.OP_READ, connection);
399         }
400         
401     }
402 
403     protected static class ControlConnection implements Connection {
404 
405         private ByteBuffer buffer = ByteBuffer.allocate(1024);
406 //        private BufferedReader in;
407 //        private PrintWriter out;
408         private SocketChannel channel;
409         private InetSocketAddress sourceSocketAddress;
410         private InetSocketAddress destinationSocketAddress;
411         private StringBuffer inputLine = new StringBuffer();
412         private Proxy proxy;
413         
414         public ControlConnection(Proxy proxy, SocketChannel channel) {
415             this.proxy = proxy;
416             this.channel = channel;
417             // in = new BufferedReader(new InputStreamReader(inbound.getInputStream()));
418             // out = new PrintWriter(new OutputStreamWriter(inbound.getOutputStream()));
419         }
420         
421         public void close(SelectionKey key) throws IOException {
422             proxy.deregisterInetAddress(sourceSocketAddress, destinationSocketAddress);
423         }
424 
425         public void read(SelectionKey key) throws IOException {
426             buffer.clear();
427             int size = channel.read(buffer);
428             if (destinationSocketAddress == null) {
429                 buffer.flip();
430                 while (size > 0) {
431                     char c = (char)buffer.get();
432                     if (c != '\n') {
433                         if (c != '\r') {
434                             inputLine.append(c);
435                         }
436                     } else {
437                         if (sourceSocketAddress == null) {
438                             sourceSocketAddress = stringToInetSocketAddress(inputLine.toString());
439                             inputLine = new StringBuffer();
440                         } else {
441                             destinationSocketAddress = stringToInetSocketAddress(inputLine.toString());
442                             proxy.registerInetAddress(sourceSocketAddress, destinationSocketAddress);
443                         }
444                     }
445                     size--;
446                 }
447             }
448         }
449 
450         public void write(SelectionKey key) throws IOException {
451             // TODO Auto-generated method stub
452             
453         }
454         
455     }
456     
457     public static void main(String[] args) throws Exception {
458         System.out.println("Creating proxy...");
459         Proxy proxy = new Proxy(8044);
460         proxy.start();
461         System.out.println("Proxy created.");
462         
463         Thread.sleep(1000);
464 
465         System.out.println("Creating server socket...");
466         ServerSocket serverSocket = new ServerSocket(9999);
467         System.out.println("Server socket created,");
468         
469         System.out.println("Creating control socket...");
470         Socket socket = new Socket("localhost", 8044);
471         System.out.println("Control socket created.");
472         PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
473         System.out.println("Sending control data...");
474         out.println("localhost:8888");
475         out.println("localhost:9999");
476         out.flush();
477         System.out.println("Control data sent.");
478         Thread.sleep(1000);
479 
480         System.out.println("Creating client socket...");
481         Socket testClientSocket = new Socket("localhost", 8888);
482         System.out.println("Client socket created.");
483         PrintWriter testClientOut = new PrintWriter(new OutputStreamWriter(testClientSocket.getOutputStream()));
484         System.out.println("Sending some data...");
485         testClientOut.println("Something!");
486         testClientOut.flush();
487         System.out.println("Data sent.");
488         
489         System.out.println("Accepting connection...");
490         Socket testServerSocket = serverSocket.accept();
491         System.out.println("Connection accepted.");
492         BufferedReader testServerIn = new BufferedReader(new InputStreamReader(testServerSocket.getInputStream()));
493         System.out.println("Receiving line...");
494         String line = testServerIn.readLine();
495         System.out.println("Got line: " + line);
496         System.out.println("Line received.");
497         
498         System.exit(0);
499     }
500 }