View Javadoc

1   package org.abstracthorizon.danube.proxy.socket;
2   
3   import java.io.BufferedReader;
4   import java.io.IOException;
5   import java.io.InputStreamReader;
6   import java.io.OutputStreamWriter;
7   import java.io.PrintWriter;
8   import java.net.InetSocketAddress;
9   import java.net.ServerSocket;
10  import java.net.Socket;
11  import java.nio.ByteBuffer;
12  import java.nio.channels.SelectionKey;
13  import java.nio.channels.Selector;
14  import java.nio.channels.ServerSocketChannel;
15  import java.nio.channels.SocketChannel;
16  import java.nio.channels.spi.SelectorProvider;
17  import java.util.Iterator;
18  import java.util.Random;
19  import java.util.Set;
20  
21  public class MakeTestProxy {
22  
23      public static int SERVER_PORT = 8123;
24      
25      public static int PROXY_PORT = 8124;
26      
27      public static int CLIENT_PORT = 8124;
28      
29      public static void main(String[] args) throws Exception {
30          
31          Thread proxy = new Thread(new Runnable() {
32              public void run() {
33                  try {
34                      Selector selector = SelectorProvider.provider().openSelector();
35                      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
36                      serverSocketChannel.configureBlocking(false);
37  
38                      InetSocketAddress inetSocketAddress = new InetSocketAddress(PROXY_PORT);
39                      serverSocketChannel.socket().bind(inetSocketAddress);
40  
41                      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
42  
43                      int keysAdded = selector.select();
44                      while (true) {
45                          Set<SelectionKey> selectedKeys = selector.selectedKeys();
46                          if (selectedKeys.size() == 0) {
47                              System.out.println("PROXY : selecting keys..." + keysAdded + "/" + selectedKeys.size() + "(" + Thread.currentThread() + ")");
48                          }
49                          Iterator<SelectionKey> iterator = selectedKeys.iterator();
50                          while (iterator.hasNext()) {
51                              SelectionKey key = iterator.next();
52                              iterator.remove();
53                              Connection c = (Connection)key.attachment();
54                              
55                              if (!key.isValid()) {
56                                  System.out.println("PROXY : Key is not valid " + c + " closing channel");
57                                  c.in.close();
58                              } else if (key.isAcceptable()) {
59                                  System.out.println("PROXY : Key is acceptable " + c);
60                                  ServerSocketChannel channel = (ServerSocketChannel)key.channel();
61  
62                                  SocketChannel outboundChannel = SocketChannel.open();
63                                  outboundChannel.configureBlocking(false);
64                                  InetSocketAddress serverSocketAddress = new InetSocketAddress("localhost", SERVER_PORT);
65  
66                                  Socket inbound = channel.accept().socket();
67                                  SocketChannel inboundChannel = inbound.getChannel();
68                                  inboundChannel.configureBlocking(false);
69                                  
70                                  Connection outConnection = new Connection(outboundChannel, inboundChannel, false);
71                                  Connection inConnection = new Connection(inboundChannel, outboundChannel, true);
72                                  
73                                  outboundChannel.register(selector, SelectionKey.OP_CONNECT, outConnection);
74                                  outboundChannel.connect(serverSocketAddress);
75  
76                                  inboundChannel.register(selector, SelectionKey.OP_READ, inConnection);
77  
78                              } else if (key.isReadable()) {
79                                  System.out.println("PROXY : Key is readable " + c);
80                                  c.buffer.clear();
81                                  int size = c.in.read(c.buffer);
82                                  if (size == -1) {
83                                      System.out.println("PROXY : Closing channel " + c);
84                                      key.channel().close();
85                                      key.cancel();
86                                      c.out.close();
87                                      c.out.socket().close();
88                                  } else if (size == 0) {
89                                      System.out.println("PROXY : read " + c + " "+ c.buffer.position() + "? What now?");
90                                  } else {
91                                      System.out.println("PROXY : read " + c + " " + c.buffer.position());
92                                      c.buffer.flip();
93                                      c.out.write(c.buffer);
94                                  }
95                              } else if (key.isWritable()) {
96                                  System.out.println("PROXY : Key is writable: " + c);
97                              } else if (key.isConnectable()) {
98                                  System.out.println("PROXY : Key is connectable: " + c);
99                                  SocketChannel socketChannel = (SocketChannel)key.channel();
100                                 socketChannel.finishConnect();
101                                 key.interestOps(SelectionKey.OP_READ);
102                             } else {
103                                 System.out.println("PROXY : No idea what! " + c);
104                             }
105                         }
106                         
107                         keysAdded = selector.select();
108                     }
109 //                    System.out.println("KeysAdded = " + keysAdded);
110                     
111                 } catch (Exception e) {
112                     e.printStackTrace();
113                 }
114             }
115         });
116         proxy.start();
117         
118         
119         Thread server = new Thread(new Runnable() {
120             public void run() {
121                 try {
122                     ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
123                     
124                     while (true) {
125                         final Socket socket = serverSocket.accept();
126                         final Random r = new Random();
127                         
128                         Thread worker = new Thread(new Runnable() {
129                             public void run() {
130                                 try {
131                                     PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
132                                     BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
133                                     
134                                     out.println("SERVER HELLO");
135                                     out.flush();
136                                     System.out.println("SERVER: >SERVER HELLO");
137                                     String line = in.readLine();
138                                     while (line != null) {
139                                         System.out.println("SERVER: <" + line);
140                                         if (r.nextInt(10) > 7) {
141                                             System.out.println("SERVER: Shutting this side!");
142                                             line = null;
143                                         } else {
144                                             out.println("RESPONSE TO \"" + line + "\"");
145                                             out.flush();
146                                             System.out.println("SERVER: >RESPONSE TO \"" + line + "\"");
147                                             line = in.readLine();
148                                         }
149                                         System.out.println("SERVER: <" + line);
150                                     }
151                                 } catch (Exception e) {
152                                     e.printStackTrace();
153                                 } finally {
154                                     try {
155                                         socket.close();
156                                         System.out.println("SERVER: Closed server socket.");
157                                     } catch (IOException ex) {
158                                         ex.printStackTrace();
159                                     }
160                                 }
161                             }
162                         });
163                         worker.start();
164                     }
165                 } catch (Exception e) {
166                     e.printStackTrace();
167                 }
168             }
169         });
170         server.start();
171         
172         
173         Thread client = new Thread(new Runnable() {
174             public void run() {
175                 try {
176                     int count = 1;
177                     while (true) {
178                         System.out.println("Client run count " + count);
179                         System.out.println();
180                         Thread.sleep(2000);
181                         
182                         Socket socket = new Socket("localhost", CLIENT_PORT);
183                         PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
184                         BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
185                         String line = in.readLine();
186                         System.out.println("CLIENT: <" + line);
187                         out.println("Request number " + count);
188                         out.flush();
189                         System.out.println("CLIENT: >Request number " + count);
190                         line = in.readLine();
191                         System.out.println("CLIENT: <" + line);
192                         socket.close();
193                         System.out.println("CLIENT: Closed client socket.");
194                         count++;
195                     }
196                 } catch (Exception e) {
197                     e.printStackTrace();
198                 }
199             }
200         });
201         client.start();
202         
203     }
204     
205     public static class Connection {
206         private static int counter = 1;
207         public ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
208         public SocketChannel in;
209         public SocketChannel out;
210         public boolean inbound;
211         private int count;
212         public Connection(SocketChannel in, SocketChannel out, boolean inbound) {
213             this.in = in;
214             this.out = out;
215             this.inbound = inbound;
216             count = counter;
217             if (inbound) {
218                 counter++;
219             }
220         }
221         
222         public String toString() {
223             if (inbound) {
224                 return "Inbound(" + count + ")";
225             } else {
226                 return "Outbound(" + count + ")";
227             }
228         }
229     }
230     
231 }