1 package org.eclipse.jetty.websocket;
2
3 import java.net.InetSocketAddress;
4 import java.net.URI;
5 import java.util.Arrays;
6 import java.util.Random;
7 import java.util.concurrent.BlockingQueue;
8 import java.util.concurrent.CountDownLatch;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.atomic.AtomicInteger;
12 import java.util.concurrent.atomic.AtomicLong;
13
14 import org.eclipse.jetty.util.StringUtil;
15 import org.eclipse.jetty.util.TypeUtil;
16
17
18
19
20
21
22
23 public class TestClient implements WebSocket.OnFrame
24 {
25 private static WebSocketClient __client = new WebSocketClient();
26 private static boolean _verbose=false;
27
28 private static final Random __random = new Random();
29
30 private final String _host;
31 private final int _port;
32 private final String _protocol;
33 private final int _timeout;
34
35 private static boolean __quiet;
36 private static int __framesSent;
37 private static int __messagesSent;
38 private static AtomicInteger __framesReceived=new AtomicInteger();
39 private static AtomicInteger __messagesReceived=new AtomicInteger();
40
41 private static AtomicLong __totalTime=new AtomicLong();
42 private static AtomicLong __minDuration=new AtomicLong(Long.MAX_VALUE);
43 private static AtomicLong __maxDuration=new AtomicLong(Long.MIN_VALUE);
44 private static long __start;
45 private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
46 int _messageBytes;
47 int _frames;
48 byte _opcode=-1;
49 private volatile WebSocket.FrameConnection _connection;
50 private final CountDownLatch _handshook = new CountDownLatch(1);
51
52
53 public void onOpen(Connection connection)
54 {
55 }
56
57 public void onClose(int closeCode, String message)
58 {
59 _handshook.countDown();
60 }
61
62 public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
63 {
64 try
65 {
66 if (_connection.isClose(opcode))
67 return false;
68
69 __framesReceived.incrementAndGet();
70 _frames++;
71 _messageBytes+=length;
72
73 if (_opcode==-1)
74 _opcode=opcode;
75
76 if (_connection.isControl(opcode) || _connection.isMessageComplete(flags))
77 {
78 int recv =__messagesReceived.incrementAndGet();
79 Long start=_starts.poll();
80
81 if (start!=null)
82 {
83 long duration = System.nanoTime()-start.longValue();
84 long max=__maxDuration.get();
85 while(duration>max && !__maxDuration.compareAndSet(max,duration))
86 max=__maxDuration.get();
87 long min=__minDuration.get();
88 while(duration<min && !__minDuration.compareAndSet(min,duration))
89 min=__minDuration.get();
90 __totalTime.addAndGet(duration);
91 if (!__quiet)
92 System.out.printf("%d bytes from %s: frames=%d req=%d time=%.1fms opcode=0x%s\n",_messageBytes,_host,_frames,recv,((double)duration/1000000.0),TypeUtil.toHexString(_opcode));
93 }
94 _frames=0;
95 _messageBytes=0;
96 _opcode=-1;
97 }
98 }
99 catch(Exception e)
100 {
101 e.printStackTrace();
102 }
103 return false;
104 }
105
106 public void onHandshake(FrameConnection connection)
107 {
108 _connection=connection;
109 _handshook.countDown();
110 }
111
112 public TestClient(String host, int port,String protocol, int timeoutMS) throws Exception
113 {
114 _host=host;
115 _port=port;
116 _protocol=protocol;
117 _timeout=timeoutMS;
118 }
119
120 private void open() throws Exception
121 {
122 WebSocketClient client = new WebSocketClient(__client);
123 client.setProtocol(_protocol);
124 client.setMaxIdleTime(_timeout);
125 client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
126 }
127
128 public void ping(byte opcode,byte[] data,int fragment) throws Exception
129 {
130 _starts.add(System.nanoTime());
131
132 int off=0;
133 int len=data.length;
134 if (fragment>0&& len>fragment)
135 len=fragment;
136 __messagesSent++;
137 while(off<data.length)
138 {
139 __framesSent++;
140 byte flags= (byte)(off+len==data.length?0x8:0);
141 byte op=(byte)(off==0?opcode:WebSocketConnectionD10.OP_CONTINUATION);
142
143 if (_verbose)
144 System.err.printf("%s#addFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(op),TypeUtil.toHexString(data,off,len));
145
146 _connection.sendFrame(flags,op,data,off,len);
147
148 off+=len;
149 if(data.length-off>len)
150 len=data.length-off;
151 if (fragment>0&& len>fragment)
152 len=fragment;
153 }
154 }
155
156 public void disconnect() throws Exception
157 {
158 if (_connection!=null)
159 _connection.disconnect();
160 }
161
162
163 private static void usage(String[] args)
164 {
165 System.err.println("ERROR: "+Arrays.asList(args));
166 System.err.println("USAGE: java -cp CLASSPATH "+TestClient.class+" [ OPTIONS ]");
167 System.err.println(" -h|--host HOST (default localhost)");
168 System.err.println(" -p|--port PORT (default 8080)");
169 System.err.println(" -b|--binary");
170 System.err.println(" -v|--verbose");
171 System.err.println(" -c|--count n (default 10)");
172 System.err.println(" -s|--size n (default 64)");
173 System.err.println(" -f|--fragment n (default 4000) ");
174 System.err.println(" -P|--protocol echo|echo-assemble|echo-fragment|echo-broadcast");
175 System.err.println(" -C|--clients n (default 1) ");
176 System.err.println(" -d|--delay n (default 1000ms) ");
177 System.exit(1);
178 }
179
180 public static void main(String[] args) throws Exception
181 {
182 __client.start();
183
184 String host="localhost";
185 int port=8080;
186 String protocol=null;
187 int count=10;
188 int size=64;
189 int fragment=4000;
190 boolean binary=false;
191 int clients=1;
192 int delay=1000;
193
194 for (int i=0;i<args.length;i++)
195 {
196 String a=args[i];
197 if ("-p".equals(a)||"--port".equals(a))
198 port=Integer.parseInt(args[++i]);
199 else if ("-h".equals(a)||"--host".equals(a))
200 port=Integer.parseInt(args[++i]);
201 else if ("-c".equals(a)||"--count".equals(a))
202 count=Integer.parseInt(args[++i]);
203 else if ("-s".equals(a)||"--size".equals(a))
204 size=Integer.parseInt(args[++i]);
205 else if ("-f".equals(a)||"--fragment".equals(a))
206 fragment=Integer.parseInt(args[++i]);
207 else if ("-P".equals(a)||"--protocol".equals(a))
208 protocol=args[++i];
209 else if ("-v".equals(a)||"--verbose".equals(a))
210 _verbose=true;
211 else if ("-b".equals(a)||"--binary".equals(a))
212 binary=true;
213 else if ("-C".equals(a)||"--clients".equals(a))
214 clients=Integer.parseInt(args[++i]);
215 else if ("-d".equals(a)||"--delay".equals(a))
216 delay=Integer.parseInt(args[++i]);
217 else if ("-q".equals(a)||"--quiet".equals(a))
218 __quiet=true;
219 else if (a.startsWith("-"))
220 usage(args);
221 }
222
223
224 TestClient[] client = new TestClient[clients];
225
226 try
227 {
228 __start=System.currentTimeMillis();
229 for (int i=0;i<clients;i++)
230 {
231 client[i]=new TestClient(host,port,protocol==null?null:protocol,60000);
232 client[i].open();
233 }
234
235 System.out.println("Jetty WebSocket PING "+host+":"+port+
236 " ("+ new InetSocketAddress(host,port)+") "+clients+" clients");
237
238
239 for (int p=0;p<count;p++)
240 {
241 long next = System.currentTimeMillis()+delay;
242
243 byte opcode=binary?WebSocketConnectionD10.OP_BINARY:WebSocketConnectionD10.OP_TEXT;
244
245 byte data[]=null;
246
247 if (opcode==WebSocketConnectionD10.OP_TEXT)
248 {
249 StringBuilder b = new StringBuilder();
250 while (b.length()<size)
251 b.append('A'+__random.nextInt(26));
252 data=b.toString().getBytes(StringUtil.__UTF8);
253 }
254 else
255 {
256 data= new byte[size];
257 __random.nextBytes(data);
258 }
259
260 for (int i=0;i<clients;i++)
261 client[i].ping(opcode,data,opcode==WebSocketConnectionD10.OP_PING?-1:fragment);
262
263 while(System.currentTimeMillis()<next)
264 Thread.sleep(10);
265 }
266 }
267 finally
268 {
269 for (int i=0;i<clients;i++)
270 if (client[i]!=null)
271 client[i].disconnect();
272
273 long duration=System.currentTimeMillis()-__start;
274 System.out.println("--- "+host+" websocket ping statistics using "+clients+" connection"+(clients>1?"s":"")+" ---");
275 System.out.println(__framesSent+" frames transmitted, "+__framesReceived+" received, "+
276 __messagesSent+" messages transmitted, "+__messagesReceived+" received, "+
277 "time "+duration+"ms "+ (1000L*__messagesReceived.get()/duration)+" req/s");
278 System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get()/1000000.0,__messagesReceived.get()==0?0.0:(__totalTime.get()/__messagesReceived.get()/1000000.0),__maxDuration.get()/1000000.0);
279
280 __client.stop();
281 }
282
283 }
284
285
286
287 }