View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.RejectedExecutionException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.eclipse.jetty.io.AbstractConnection;
32  import org.eclipse.jetty.io.ByteBufferPool;
33  import org.eclipse.jetty.io.Connection;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.util.BufferUtil;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.ForkInvoker;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  import org.eclipse.jetty.util.thread.Scheduler;
41  import org.eclipse.jetty.websocket.api.CloseException;
42  import org.eclipse.jetty.websocket.api.StatusCode;
43  import org.eclipse.jetty.websocket.api.SuspendToken;
44  import org.eclipse.jetty.websocket.api.WebSocketException;
45  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
46  import org.eclipse.jetty.websocket.api.WriteCallback;
47  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
48  import org.eclipse.jetty.websocket.api.extensions.Frame;
49  import org.eclipse.jetty.websocket.common.CloseInfo;
50  import org.eclipse.jetty.websocket.common.ConnectionState;
51  import org.eclipse.jetty.websocket.common.Generator;
52  import org.eclipse.jetty.websocket.common.LogicalConnection;
53  import org.eclipse.jetty.websocket.common.Parser;
54  import org.eclipse.jetty.websocket.common.WebSocketSession;
55  
56  /**
57   * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
58   */
59  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection
60  {
61      private class FlushCallback implements Callback
62      {
63          @Override
64          public void failed(Throwable x)
65          {
66              LOG.warn("Write flush failure",x);
67          }
68  
69          @Override
70          public void succeeded()
71          {
72              // Lets process the next set of bytes...
73              AbstractWebSocketConnection.this.complete(writeBytes);
74          }
75      }
76  
77      private class FlushInvoker extends ForkInvoker<Callback>
78      {
79          private FlushInvoker()
80          {
81              super(4);
82          }
83  
84          @Override
85          public void call(Callback callback)
86          {
87              flush();
88          }
89  
90          @Override
91          public void fork(final Callback callback)
92          {
93              execute(new Runnable()
94              {
95                  @Override
96                  public void run()
97                  {
98                      flush();
99                  }
100             });
101         }
102 
103         @Override
104         public String toString()
105         {
106             return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
107         }
108     }
109 
110     private class OnCloseCallback implements WriteCallback
111     {
112         @Override
113         public void writeFailed(Throwable x)
114         {
115             disconnect();
116         }
117 
118         @Override
119         public void writeSuccess()
120         {
121             onWriteWebSocketClose();
122         }
123     }
124 
125     public static class Stats {
126         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
127         private AtomicLong countOnFillableEvents = new AtomicLong(0);
128         private AtomicLong countFillableErrors = new AtomicLong(0);
129 
130         public long getFillableErrorCount() {
131             return countFillableErrors.get();
132         }
133 
134         public long getFillInterestedCount() {
135             return countFillInterestedEvents.get();
136         }
137 
138         public long getOnFillableCount() {
139             return countOnFillableEvents.get();
140         }
141     }
142 
143     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
144 
145     /**
146      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
147      */
148     private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
149 
150     private final ForkInvoker<Callback> invoker = new FlushInvoker();
151     private final ByteBufferPool bufferPool;
152     private final Scheduler scheduler;
153     private final Generator generator;
154     private final Parser parser;
155     private final WebSocketPolicy policy;
156     private final WriteBytesProvider writeBytes;
157     private final AtomicBoolean suspendToken;
158     private WebSocketSession session;
159     private List<ExtensionConfig> extensions;
160     private boolean flushing;
161     private boolean isFilling;
162     private IOState ioState;
163     private Stats stats = new Stats();
164 
165     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
166     {
167         super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
168         this.policy = policy;
169         this.bufferPool = bufferPool;
170         this.generator = new Generator(policy,bufferPool);
171         this.parser = new Parser(policy,bufferPool);
172         this.scheduler = scheduler;
173         this.extensions = new ArrayList<>();
174         this.suspendToken = new AtomicBoolean(false);
175         this.ioState = new IOState();
176         this.ioState.setState(ConnectionState.CONNECTING);
177         this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
178         this.setInputBufferSize(policy.getInputBufferSize());
179     }
180 
181     @Override
182     public void close()
183     {
184         close(StatusCode.NORMAL,null);
185     }
186 
187     @Override
188     public void close(int statusCode, String reason)
189     {
190         enqueClose(statusCode,reason);
191     }
192 
193     public void complete(final Callback callback)
194     {
195         LOG.debug("complete({})",callback);
196         synchronized (writeBytes)
197         {
198             flushing = false;
199         }
200 
201         if (!ioState.isOpen() || (callback == null))
202         {
203             return;
204         }
205 
206         invoker.invoke(callback);
207     }
208 
209     @Override
210     public void disconnect()
211     {
212         disconnect(false);
213     }
214 
215     public void disconnect(boolean onlyOutput)
216     {
217         ioState.setState(ConnectionState.CLOSED);
218         EndPoint endPoint = getEndPoint();
219         // We need to gently close first, to allow
220         // SSL close alerts to be sent by Jetty
221         LOG.debug("Shutting down output {}",endPoint);
222         endPoint.shutdownOutput();
223         if (!onlyOutput)
224         {
225             LOG.debug("Closing {}",endPoint);
226             endPoint.close();
227         }
228     }
229 
230     /**
231      * Enqueue a close frame.
232      * 
233      * @param statusCode
234      *            the WebSocket status code.
235      * @param reason
236      *            the (optional) reason string. (null is allowed)
237      * @see StatusCode
238      */
239     private void enqueClose(int statusCode, String reason)
240     {
241         synchronized (writeBytes)
242         {
243             // It is possible to get close events from many different sources.
244             // Make sure we only sent 1 over the network.
245             if (writeBytes.isClosed())
246             {
247                 // already sent the close
248                 return;
249             }
250         }
251         CloseInfo close = new CloseInfo(statusCode,reason);
252         // TODO: create DisconnectCallback?
253         outgoingFrame(close.asFrame(),new OnCloseCallback());
254     }
255 
256     private void execute(Runnable task)
257     {
258         try
259         {
260             getExecutor().execute(task);
261         }
262         catch (RejectedExecutionException e)
263         {
264             LOG.debug("Job not dispatched: {}",task);
265         }
266     }
267 
268     @Override
269     public void fillInterested()
270     {
271         stats.countFillInterestedEvents.incrementAndGet();
272         super.fillInterested();
273     }
274 
275     public void flush()
276     {
277         ByteBuffer buffer = null;
278 
279         synchronized (writeBytes)
280         {
281             if (writeBytes.isFailed())
282             {
283                 LOG.debug(".flush() - queue is in failed state");
284                 return;
285             }
286 
287             if (flushing)
288             {
289                 return;
290             }
291 
292             if (LOG.isDebugEnabled())
293             {
294                 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
295             }
296 
297             if (!isOpen())
298             {
299                 // No longer have an open connection, drop them all.
300                 writeBytes.failAll(new WebSocketException("Connection closed"));
301                 return;
302             }
303 
304             buffer = writeBytes.getByteBuffer();
305 
306             if (buffer == null)
307             {
308                 return;
309             }
310 
311             flushing = true;
312 
313             if (LOG.isDebugEnabled())
314             {
315                 LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
316             }
317         }
318 
319         write(buffer);
320     }
321 
322     public ByteBufferPool getBufferPool()
323     {
324         return bufferPool;
325     }
326 
327     /**
328      * Get the list of extensions in use.
329      * <p>
330      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
331      * 
332      * @return the list of negotiated extensions in use.
333      */
334     public List<ExtensionConfig> getExtensions()
335     {
336         return extensions;
337     }
338 
339     public Generator getGenerator()
340     {
341         return generator;
342     }
343 
344     @Override
345     public IOState getIOState()
346     {
347         return ioState;
348     }
349 
350     public Parser getParser()
351     {
352         return parser;
353     }
354 
355     @Override
356     public WebSocketPolicy getPolicy()
357     {
358         return this.policy;
359     }
360 
361     @Override
362     public InetSocketAddress getRemoteAddress()
363     {
364         return getEndPoint().getRemoteAddress();
365     }
366 
367     public Scheduler getScheduler()
368     {
369         return scheduler;
370     }
371 
372     @Override
373     public WebSocketSession getSession()
374     {
375         return session;
376     }
377 
378     public Stats getStats()
379     {
380         return stats;
381     }
382 
383     @Override
384     public boolean isOpen()
385     {
386         return getIOState().isOpen() && getEndPoint().isOpen();
387     }
388 
389     @Override
390     public boolean isReading()
391     {
392         return isFilling;
393     }
394 
395     @Override
396     public void onClose()
397     {
398         super.onClose();
399         this.getIOState().setState(ConnectionState.CLOSED);
400     }
401 
402     @Override
403     public void onFillable()
404     {
405         LOG.debug("{} onFillable()",policy.getBehavior());
406         stats.countOnFillableEvents.incrementAndGet();
407         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
408         BufferUtil.clear(buffer);
409         boolean readMore = false;
410         try
411         {
412             isFilling = true;
413             readMore = (read(buffer) != -1);
414         }
415         finally
416         {
417             bufferPool.release(buffer);
418         }
419 
420         if (readMore && (suspendToken.get() == false))
421         {
422             fillInterested();
423         }
424         else
425         {
426             isFilling = false;
427         }
428     }
429 
430     @Override
431     protected void onFillInterestedFailed(Throwable cause)
432     {
433         LOG.ignore(cause);
434         stats.countFillInterestedEvents.incrementAndGet();
435         super.onFillInterestedFailed(cause);
436     }
437 
438     @Override
439     public void onOpen()
440     {
441         super.onOpen();
442         this.ioState.setState(ConnectionState.OPEN);
443         LOG.debug("fillInterested");
444         fillInterested();
445     }
446 
447     @Override
448     protected boolean onReadTimeout()
449     {
450         LOG.warn("Read Timeout");
451 
452         IOState state = getIOState();
453         if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
454         {
455             // close already initiated, extra timeouts not relevant
456             // allow udnerlying connection and endpoint to disconnect on its own
457             return true;
458         }
459 
460         // Initiate close - politely send close frame.
461         // Note: it is not possible in 100% of cases during read timeout to send this close frame.
462         session.close(StatusCode.NORMAL,"Idle Timeout");
463 
464         return false;
465     }
466 
467     public void onWriteWebSocketClose()
468     {
469         if (ioState.onCloseHandshake(false))
470         {
471             disconnect();
472         }
473     }
474 
475     /**
476      * Frame from API, User, or Internal implementation destined for network.
477      */
478     @Override
479     public void outgoingFrame(Frame frame, WriteCallback callback)
480     {
481         if (LOG.isDebugEnabled())
482         {
483             LOG.debug("outgoingFrame({}, {})",frame,callback);
484         }
485 
486         if (!isOpen())
487         {
488             return;
489         }
490 
491         synchronized (writeBytes)
492         {
493             writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
494         }
495 
496         flush();
497     }
498 
499     private int read(ByteBuffer buffer)
500     {
501         EndPoint endPoint = getEndPoint();
502         try
503         {
504             while (true)
505             {
506                 int filled = endPoint.fill(buffer);
507                 if (filled == 0)
508                 {
509                     return 0;
510                 }
511                 else if (filled < 0)
512                 {
513                     LOG.debug("read - EOF Reached");
514                     return -1;
515                 }
516                 else
517                 {
518                     if (LOG.isDebugEnabled())
519                     {
520                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
521                     }
522                     parser.parse(buffer);
523                 }
524             }
525         }
526         catch (IOException e)
527         {
528             LOG.warn(e);
529             enqueClose(StatusCode.PROTOCOL,e.getMessage());
530             return -1;
531         }
532         catch (CloseException e)
533         {
534             LOG.warn(e);
535             enqueClose(e.getStatusCode(),e.getMessage());
536             return -1;
537         }
538     }
539 
540     @Override
541     public void resume()
542     {
543         if (suspendToken.getAndSet(false))
544         {
545             fillInterested();
546         }
547     }
548 
549     /**
550      * Get the list of extensions in use.
551      * <p>
552      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
553      * 
554      * @param extensions
555      *            the list of negotiated extensions in use.
556      */
557     public void setExtensions(List<ExtensionConfig> extensions)
558     {
559         this.extensions = extensions;
560     }
561 
562     @Override
563     public void setInputBufferSize(int inputBufferSize)
564     {
565         if(inputBufferSize < MIN_BUFFER_SIZE) {
566             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
567         }
568         super.setInputBufferSize(inputBufferSize);
569     }
570 
571     @Override
572     public void setSession(WebSocketSession session)
573     {
574         this.session = session;
575     }
576 
577     @Override
578     public SuspendToken suspend()
579     {
580         suspendToken.set(true);
581         return this;
582     }
583 
584     @Override
585     public String toString()
586     {
587         return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
588     }
589 
590     private <C> void write(ByteBuffer buffer)
591     {
592         EndPoint endpoint = getEndPoint();
593 
594         if (!isOpen())
595         {
596             writeBytes.failAll(new IOException("Connection closed"));
597             return;
598         }
599 
600         try
601         {
602             endpoint.write(writeBytes,buffer);
603         }
604         catch (Throwable t)
605         {
606             writeBytes.failed(t);
607         }
608     }
609 }