View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.eclipse.jetty.io.AbstractConnection;
33  import org.eclipse.jetty.io.ByteBufferPool;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.Callback;
38  import org.eclipse.jetty.util.ForkInvoker;
39  import org.eclipse.jetty.util.StringUtil;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.CloseException;
44  import org.eclipse.jetty.websocket.api.CloseStatus;
45  import org.eclipse.jetty.websocket.api.StatusCode;
46  import org.eclipse.jetty.websocket.api.SuspendToken;
47  import org.eclipse.jetty.websocket.api.WebSocketException;
48  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49  import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
50  import org.eclipse.jetty.websocket.api.WriteCallback;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
52  import org.eclipse.jetty.websocket.api.extensions.Frame;
53  import org.eclipse.jetty.websocket.common.CloseInfo;
54  import org.eclipse.jetty.websocket.common.ConnectionState;
55  import org.eclipse.jetty.websocket.common.Generator;
56  import org.eclipse.jetty.websocket.common.LogicalConnection;
57  import org.eclipse.jetty.websocket.common.Parser;
58  import org.eclipse.jetty.websocket.common.WebSocketSession;
59  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
60  
61  /**
62   * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
63   */
64  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
65  {
66      private class FlushCallback implements Callback
67      {
68          /**
69           * The Endpoint.write() failure path
70           */
71          @Override
72          public void failed(Throwable x)
73          {
74              LOG.debug("Write flush failure",x);
75  
76              // Unable to write? can't notify other side of close, so disconnect.
77              // This is an ABNORMAL closure
78              String reason = "Websocket write failure";
79  
80              if (x instanceof EOFException)
81              {
82                  reason = "EOF";
83                  Throwable cause = x.getCause();
84                  if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
85                  {
86                      reason = "EOF: " + cause.getMessage();
87                  }
88              }
89              else
90              {
91                  if (StringUtil.isNotBlank(x.getMessage()))
92                  {
93                      reason = x.getMessage();
94                  }
95              }
96  
97              // Abnormal Close
98              reason = CloseStatus.trimMaxReasonLength(reason);
99              session.incomingError(new WebSocketException(x)); // TODO: JSR-356 change to Throwable
100             session.notifyClose(StatusCode.NO_CLOSE,reason);
101 
102             disconnect(); // disconnect endpoint & connection
103         }
104 
105         @Override
106         public void succeeded()
107         {
108             AbstractWebSocketConnection.this.complete(writeBytes);
109         }
110     }
111 
112     private class FlushInvoker extends ForkInvoker<Callback>
113     {
114         private FlushInvoker()
115         {
116             super(4);
117         }
118 
119         @Override
120         public void call(Callback callback)
121         {
122             flush();
123         }
124 
125         @Override
126         public void fork(final Callback callback)
127         {
128             execute(new Runnable()
129             {
130                 @Override
131                 public void run()
132                 {
133                     flush();
134                 }
135             });
136         }
137 
138         @Override
139         public String toString()
140         {
141             return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
142         }
143     }
144     
145     public class OnDisconnectCallback implements WriteCallback
146     {
147         @Override
148         public void writeFailed(Throwable x)
149         {
150             disconnect();
151         }
152 
153         @Override
154         public void writeSuccess()
155         {
156             disconnect();
157         }
158     }
159 
160     public static class Stats
161     {
162         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
163         private AtomicLong countOnFillableEvents = new AtomicLong(0);
164         private AtomicLong countFillableErrors = new AtomicLong(0);
165 
166         public long getFillableErrorCount()
167         {
168             return countFillableErrors.get();
169         }
170 
171         public long getFillInterestedCount()
172         {
173             return countFillInterestedEvents.get();
174         }
175 
176         public long getOnFillableCount()
177         {
178             return countOnFillableEvents.get();
179         }
180     }
181 
182     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
183 
184     /**
185      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
186      */
187     private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
188 
189     private final ForkInvoker<Callback> invoker = new FlushInvoker();
190     private final ByteBufferPool bufferPool;
191     private final Scheduler scheduler;
192     private final Generator generator;
193     private final Parser parser;
194     private final WebSocketPolicy policy;
195     private final WriteBytesProvider writeBytes;
196     private final AtomicBoolean suspendToken;
197     private WebSocketSession session;
198     private List<ExtensionConfig> extensions;
199     private boolean flushing;
200     private boolean isFilling;
201     private IOState ioState;
202     private Stats stats = new Stats();
203 
204     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
205     {
206         super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
207         this.policy = policy;
208         this.bufferPool = bufferPool;
209         this.generator = new Generator(policy,bufferPool);
210         this.parser = new Parser(policy,bufferPool);
211         this.scheduler = scheduler;
212         this.extensions = new ArrayList<>();
213         this.suspendToken = new AtomicBoolean(false);
214         this.ioState = new IOState();
215         this.ioState.addListener(this);
216         this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
217         this.setInputBufferSize(policy.getInputBufferSize());
218     }
219 
220     @Override
221     public void close()
222     {
223         close(StatusCode.NORMAL,null);
224     }
225 
226     @Override
227     public void close(int statusCode, String reason)
228     {
229         enqueClose(statusCode,reason);
230     }
231 
232     public void complete(final Callback callback)
233     {
234         LOG.debug("complete({})",callback);
235         synchronized (writeBytes)
236         {
237             flushing = false;
238         }
239 
240         if (!ioState.isOpen() || (callback == null))
241         {
242             return;
243         }
244 
245         invoker.invoke(callback);
246     }
247 
248     @Override
249     public void disconnect()
250     {
251         LOG.debug("{} disconnect()", policy.getBehavior());
252         synchronized (writeBytes)
253         {
254             if (!writeBytes.isClosed())
255             {
256                 writeBytes.close();
257             }
258         }
259         disconnect(false);
260     }
261 
262     public void disconnect(boolean onlyOutput)
263     {
264         EndPoint endPoint = getEndPoint();
265         // We need to gently close first, to allow
266         // SSL close alerts to be sent by Jetty
267         LOG.debug("Shutting down output {}",endPoint);
268         endPoint.shutdownOutput();
269         if (!onlyOutput)
270         {
271             LOG.debug("Closing {}",endPoint);
272             endPoint.close();
273         }
274     }
275 
276     /**
277      * Enqueue a close frame.
278      * 
279      * @param statusCode
280      *            the WebSocket status code.
281      * @param reason
282      *            the (optional) reason string. (null is allowed)
283      * @see StatusCode
284      */
285     private void enqueClose(int statusCode, String reason)
286     {
287         CloseInfo close = new CloseInfo(statusCode,reason);
288         ioState.onCloseLocal(close);
289     }
290 
291     protected void execute(Runnable task)
292     {
293         try
294         {
295             getExecutor().execute(task);
296         }
297         catch (RejectedExecutionException e)
298         {
299             LOG.debug("Job not dispatched: {}",task);
300         }
301     }
302 
303     @Override
304     public void fillInterested()
305     {
306         stats.countFillInterestedEvents.incrementAndGet();
307         super.fillInterested();
308     }
309 
310     public void flush()
311     {
312         ByteBuffer buffer = null;
313 
314         synchronized (writeBytes)
315         {
316             if (flushing)
317             {
318                 return;
319             }
320 
321             if (LOG.isDebugEnabled())
322             {
323                 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
324             }
325 
326             if (!isOpen())
327             {
328                 // No longer have an open connection, drop them all.
329                 writeBytes.failAll(new WebSocketException("Connection closed"));
330                 return;
331             }
332 
333             buffer = writeBytes.getByteBuffer();
334 
335             if (buffer == null)
336             {
337                 return;
338             }
339 
340             flushing = true;
341 
342             if (LOG.isDebugEnabled())
343             {
344                 LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
345             }
346         }
347 
348         write(buffer);
349     }
350 
351     public ByteBufferPool getBufferPool()
352     {
353         return bufferPool;
354     }
355 
356     /**
357      * Get the list of extensions in use.
358      * <p>
359      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
360      * 
361      * @return the list of negotiated extensions in use.
362      */
363     public List<ExtensionConfig> getExtensions()
364     {
365         return extensions;
366     }
367 
368     public Generator getGenerator()
369     {
370         return generator;
371     }
372 
373     @Override
374     public IOState getIOState()
375     {
376         return ioState;
377     }
378 
379     @Override
380     public long getMaxIdleTimeout()
381     {
382         return getEndPoint().getIdleTimeout();
383     }
384 
385     public Parser getParser()
386     {
387         return parser;
388     }
389 
390     @Override
391     public WebSocketPolicy getPolicy()
392     {
393         return this.policy;
394     }
395 
396     @Override
397     public InetSocketAddress getRemoteAddress()
398     {
399         return getEndPoint().getRemoteAddress();
400     }
401 
402     public Scheduler getScheduler()
403     {
404         return scheduler;
405     }
406 
407     @Override
408     public WebSocketSession getSession()
409     {
410         return session;
411     }
412 
413     public Stats getStats()
414     {
415         return stats;
416     }
417 
418     @Override
419     public boolean isOpen()
420     {
421         return getIOState().isOpen() && getEndPoint().isOpen();
422     }
423 
424     @Override
425     public boolean isReading()
426     {
427         return isFilling;
428     }
429 
430     /**
431      * Physical connection disconnect.
432      * <p>
433      * Not related to WebSocket close handshake.
434      */
435     @Override
436     public void onClose()
437     {
438         super.onClose();
439         writeBytes.close();
440     }
441 
442     @Override
443     public void onConnectionStateChange(ConnectionState state)
444     {
445         LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
446         switch (state)
447         {
448             case OPEN:
449                 LOG.debug("fillInterested");
450                 fillInterested();
451                 break;
452             case CLOSED:
453                 this.disconnect();
454                 break;
455             case CLOSING:
456                 CloseInfo close = ioState.getCloseInfo();
457                 // append close frame
458                 outgoingFrame(close.asFrame(),new OnDisconnectCallback());
459             default:
460                 break;
461         }
462     }
463 
464     @Override
465     public void onFillable()
466     {
467         LOG.debug("{} onFillable()",policy.getBehavior());
468         stats.countOnFillableEvents.incrementAndGet();
469         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
470         BufferUtil.clear(buffer);
471         boolean readMore = false;
472         try
473         {
474             isFilling = true;
475             readMore = (read(buffer) != -1);
476         }
477         finally
478         {
479             bufferPool.release(buffer);
480         }
481 
482         if (readMore && (suspendToken.get() == false))
483         {
484             fillInterested();
485         }
486         else
487         {
488             isFilling = false;
489         }
490     }
491 
492     @Override
493     protected void onFillInterestedFailed(Throwable cause)
494     {
495         LOG.ignore(cause);
496         stats.countFillInterestedEvents.incrementAndGet();
497         super.onFillInterestedFailed(cause);
498     }
499 
500     @Override
501     public void onOpen()
502     {
503         super.onOpen();
504         this.ioState.onOpened();
505     }
506 
507     @Override
508     protected boolean onReadTimeout()
509     {
510         LOG.debug("Read Timeout");
511 
512         IOState state = getIOState();
513         if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
514         {
515             // close already initiated, extra timeouts not relevant
516             // allow underlying connection and endpoint to disconnect on its own
517             return true;
518         }
519 
520         // Initiate close - politely send close frame.
521         session.incomingError(new WebSocketTimeoutException("Timeout on Read"));
522         close(StatusCode.SHUTDOWN,"Idle Timeout");
523 
524         return false;
525     }
526 
527     /**
528      * Frame from API, User, or Internal implementation destined for network.
529      */
530     @Override
531     public void outgoingFrame(Frame frame, WriteCallback callback)
532     {
533         if (LOG.isDebugEnabled())
534         {
535             LOG.debug("outgoingFrame({}, {})",frame,callback);
536         }
537 
538         writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
539 
540         flush();
541     }
542 
543     private int read(ByteBuffer buffer)
544     {
545         EndPoint endPoint = getEndPoint();
546         try
547         {
548             while (true)
549             {
550                 int filled = endPoint.fill(buffer);
551                 if (filled == 0)
552                 {
553                     return 0;
554                 }
555                 else if (filled < 0)
556                 {
557                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
558                     ioState.onReadEOF();
559                     return -1;
560                 }
561                 else
562                 {
563                     if (LOG.isDebugEnabled())
564                     {
565                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
566                     }
567                     parser.parse(buffer);
568                 }
569             }
570         }
571         catch (IOException e)
572         {
573             LOG.warn(e);
574             enqueClose(StatusCode.PROTOCOL,e.getMessage());
575             return -1;
576         }
577         catch (CloseException e)
578         {
579             LOG.warn(e);
580             enqueClose(e.getStatusCode(),e.getMessage());
581             return -1;
582         }
583     }
584 
585     @Override
586     public void resume()
587     {
588         if (suspendToken.getAndSet(false))
589         {
590             fillInterested();
591         }
592     }
593 
594     /**
595      * Get the list of extensions in use.
596      * <p>
597      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
598      * 
599      * @param extensions
600      *            the list of negotiated extensions in use.
601      */
602     public void setExtensions(List<ExtensionConfig> extensions)
603     {
604         this.extensions = extensions;
605     }
606 
607     @Override
608     public void setInputBufferSize(int inputBufferSize)
609     {
610         if (inputBufferSize < MIN_BUFFER_SIZE)
611         {
612             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
613         }
614         super.setInputBufferSize(inputBufferSize);
615     }
616 
617     @Override
618     public void setMaxIdleTimeout(long ms)
619     {
620         getEndPoint().setIdleTimeout(ms);
621     }
622 
623     @Override
624     public void setSession(WebSocketSession session)
625     {
626         this.session = session;
627     }
628 
629     @Override
630     public SuspendToken suspend()
631     {
632         suspendToken.set(true);
633         return this;
634     }
635 
636     @Override
637     public String toString()
638     {
639         return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
640     }
641 
642     private <C> void write(ByteBuffer buffer)
643     {
644         EndPoint endpoint = getEndPoint();
645 
646         if (!isOpen())
647         {
648             writeBytes.failAll(new IOException("Connection closed"));
649             return;
650         }
651 
652         try
653         {
654             endpoint.write(writeBytes,buffer);
655         }
656         catch (Throwable t)
657         {
658             writeBytes.failed(t);
659         }
660     }
661 }