View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.net.InetSocketAddress;
4   import java.net.URI;
5   import java.util.Arrays;
6   import java.util.Random;
7   import java.util.concurrent.BlockingQueue;
8   import java.util.concurrent.CountDownLatch;
9   import java.util.concurrent.LinkedBlockingQueue;
10  import java.util.concurrent.TimeUnit;
11  import java.util.concurrent.atomic.AtomicInteger;
12  import java.util.concurrent.atomic.AtomicLong;
13  
14  import org.eclipse.jetty.util.StringUtil;
15  import org.eclipse.jetty.util.TypeUtil;
16  
17  /**
18   * @version $Revision$ $Date$
19   * 
20   * This is not a general purpose websocket client.
21   * It's only for testing the websocket server and is hardwired to a specific draft version of the protocol.
22   */
23  public class TestClient implements WebSocket.OnFrame
24  {
25      private static WebSocketClient __client = new WebSocketClient();
26      private static boolean _verbose=false;
27  
28      private static final Random __random = new Random();
29      
30      private final String _host;
31      private final int _port;
32      private final String _protocol;
33      private final int _timeout;
34      
35      private static boolean __quiet;
36      private static int __framesSent;
37      private static int __messagesSent;
38      private static AtomicInteger __framesReceived=new AtomicInteger();
39      private static AtomicInteger __messagesReceived=new AtomicInteger();
40      
41      private static AtomicLong __totalTime=new AtomicLong();
42      private static AtomicLong __minDuration=new AtomicLong(Long.MAX_VALUE);
43      private static AtomicLong __maxDuration=new AtomicLong(Long.MIN_VALUE);
44      private static long __start;
45      private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
46      int _messageBytes;
47      int _frames;
48      byte _opcode=-1;
49      private volatile WebSocket.FrameConnection _connection;
50      private final CountDownLatch _handshook = new CountDownLatch(1);
51      
52      
53      public void onOpen(Connection connection)
54      {
55      }
56  
57      public void onClose(int closeCode, String message)
58      {
59          _handshook.countDown();
60      }
61  
62      public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
63      {
64          try
65          {
66              if (_connection.isClose(opcode))
67                  return false;
68              
69              __framesReceived.incrementAndGet();
70              _frames++;
71              _messageBytes+=length;
72              
73              if (_opcode==-1)
74                  _opcode=opcode;
75              
76              if (_connection.isControl(opcode) || _connection.isMessageComplete(flags))
77              {  
78                  int recv =__messagesReceived.incrementAndGet();
79                  Long start=_starts.poll();
80  
81                  if (start!=null)
82                  {
83                      long duration = System.nanoTime()-start.longValue();
84                      long max=__maxDuration.get();
85                      while(duration>max && !__maxDuration.compareAndSet(max,duration))
86                          max=__maxDuration.get();
87                      long min=__minDuration.get();
88                      while(duration<min && !__minDuration.compareAndSet(min,duration))
89                          min=__minDuration.get();
90                      __totalTime.addAndGet(duration);
91                      if (!__quiet)
92                          System.out.printf("%d bytes from %s: frames=%d req=%d time=%.1fms opcode=0x%s\n",_messageBytes,_host,_frames,recv,((double)duration/1000000.0),TypeUtil.toHexString(_opcode));
93                  }
94                  _frames=0;
95                  _messageBytes=0;
96                  _opcode=-1;   
97              }
98          }
99          catch(Exception e)
100         {
101             e.printStackTrace();
102         }
103         return false;
104     }
105 
106     public void onHandshake(FrameConnection connection)
107     {
108         _connection=connection;
109         _handshook.countDown();
110     }
111     
112     public TestClient(String host, int port,String protocol, int timeoutMS) throws Exception
113     {
114         _host=host;
115         _port=port;
116         _protocol=protocol;
117         _timeout=timeoutMS;
118     }
119 
120     private void open() throws Exception
121     {
122         WebSocketClient client = new WebSocketClient(__client);
123         client.setProtocol(_protocol);
124         client.setMaxIdleTime(_timeout);
125         client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
126     }
127 
128     public void ping(byte opcode,byte[] data,int fragment) throws Exception
129     {
130         _starts.add(System.nanoTime());
131 
132         int off=0;
133         int len=data.length;
134         if (fragment>0&& len>fragment)
135             len=fragment;
136         __messagesSent++;
137         while(off<data.length)
138         {                    
139             __framesSent++;
140             byte flags= (byte)(off+len==data.length?0x8:0);
141             byte op=(byte)(off==0?opcode:WebSocketConnectionD10.OP_CONTINUATION);
142 
143             if (_verbose)                
144                 System.err.printf("%s#addFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(op),TypeUtil.toHexString(data,off,len));
145 
146             _connection.sendFrame(flags,op,data,off,len);
147 
148             off+=len;
149             if(data.length-off>len)
150                 len=data.length-off;
151             if (fragment>0&& len>fragment)
152                 len=fragment;
153         }
154     }
155 
156     public void disconnect() throws Exception
157     {
158         if (_connection!=null)
159             _connection.disconnect();
160     }
161     
162 
163     private static void usage(String[] args)
164     {
165         System.err.println("ERROR: "+Arrays.asList(args));
166         System.err.println("USAGE: java -cp CLASSPATH "+TestClient.class+" [ OPTIONS ]");
167         System.err.println("  -h|--host HOST  (default localhost)");
168         System.err.println("  -p|--port PORT  (default 8080)");
169         System.err.println("  -b|--binary");
170         System.err.println("  -v|--verbose");
171         System.err.println("  -c|--count n    (default 10)");
172         System.err.println("  -s|--size n     (default 64)");
173         System.err.println("  -f|--fragment n (default 4000) ");
174         System.err.println("  -P|--protocol echo|echo-assemble|echo-fragment|echo-broadcast");
175         System.err.println("  -C|--clients n  (default 1) ");
176         System.err.println("  -d|--delay n    (default 1000ms) ");
177         System.exit(1);
178     }
179     
180     public static void main(String[] args) throws Exception
181     {
182         __client.start();
183 
184         String host="localhost";
185         int port=8080;
186         String protocol=null;
187         int count=10;
188         int size=64;
189         int fragment=4000;
190         boolean binary=false;
191         int clients=1;
192         int delay=1000;
193 
194         for (int i=0;i<args.length;i++)
195         {
196             String a=args[i];
197             if ("-p".equals(a)||"--port".equals(a))
198                 port=Integer.parseInt(args[++i]);
199             else if ("-h".equals(a)||"--host".equals(a))
200                 port=Integer.parseInt(args[++i]);
201             else if ("-c".equals(a)||"--count".equals(a))
202                 count=Integer.parseInt(args[++i]);
203             else if ("-s".equals(a)||"--size".equals(a))
204                 size=Integer.parseInt(args[++i]);
205             else if ("-f".equals(a)||"--fragment".equals(a))
206                 fragment=Integer.parseInt(args[++i]);
207             else if ("-P".equals(a)||"--protocol".equals(a))
208                 protocol=args[++i];
209             else if ("-v".equals(a)||"--verbose".equals(a))
210                 _verbose=true;
211             else if ("-b".equals(a)||"--binary".equals(a))
212                 binary=true;
213             else if ("-C".equals(a)||"--clients".equals(a))
214                 clients=Integer.parseInt(args[++i]);
215             else if ("-d".equals(a)||"--delay".equals(a))
216                 delay=Integer.parseInt(args[++i]);
217             else if ("-q".equals(a)||"--quiet".equals(a))
218                 __quiet=true;
219             else if (a.startsWith("-"))
220                 usage(args);
221         }
222 
223 
224         TestClient[] client = new TestClient[clients];
225         
226         try
227         {
228             __start=System.currentTimeMillis();
229             for (int i=0;i<clients;i++)
230             {
231                 client[i]=new TestClient(host,port,protocol==null?null:protocol,60000);
232                 client[i].open();
233             }
234 
235             System.out.println("Jetty WebSocket PING "+host+":"+port+
236                     " ("+ new InetSocketAddress(host,port)+") "+clients+" clients");
237             
238             
239             for (int p=0;p<count;p++)
240             {
241                 long next = System.currentTimeMillis()+delay;
242                 
243                 byte opcode=binary?WebSocketConnectionD10.OP_BINARY:WebSocketConnectionD10.OP_TEXT;
244                 
245                 byte data[]=null;
246 
247                 if (opcode==WebSocketConnectionD10.OP_TEXT)
248                 {
249                     StringBuilder b = new StringBuilder();
250                     while (b.length()<size)
251                         b.append('A'+__random.nextInt(26));
252                     data=b.toString().getBytes(StringUtil.__UTF8);
253                 }
254                 else
255                 {             
256                     data= new byte[size];
257                     __random.nextBytes(data);
258                 }
259 
260                 for (int i=0;i<clients;i++)
261                     client[i].ping(opcode,data,opcode==WebSocketConnectionD10.OP_PING?-1:fragment);
262                 
263                 while(System.currentTimeMillis()<next)
264                     Thread.sleep(10);
265             }
266         }
267         finally
268         {
269             for (int i=0;i<clients;i++)
270                 if (client[i]!=null)
271                     client[i].disconnect();
272             
273             long duration=System.currentTimeMillis()-__start;
274             System.out.println("--- "+host+" websocket ping statistics using "+clients+" connection"+(clients>1?"s":"")+" ---");
275             System.out.println(__framesSent+" frames transmitted, "+__framesReceived+" received, "+
276                     __messagesSent+" messages transmitted, "+__messagesReceived+" received, "+
277                     "time "+duration+"ms "+ (1000L*__messagesReceived.get()/duration)+" req/s");
278             System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get()/1000000.0,__messagesReceived.get()==0?0.0:(__totalTime.get()/__messagesReceived.get()/1000000.0),__maxDuration.get()/1000000.0);
279             
280             __client.stop();
281         }
282 
283     }
284     
285     
286     
287 }