View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.BatchMode;
44  import org.eclipse.jetty.websocket.api.CloseException;
45  import org.eclipse.jetty.websocket.api.StatusCode;
46  import org.eclipse.jetty.websocket.api.SuspendToken;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.WriteCallback;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50  import org.eclipse.jetty.websocket.api.extensions.Frame;
51  import org.eclipse.jetty.websocket.common.CloseInfo;
52  import org.eclipse.jetty.websocket.common.ConnectionState;
53  import org.eclipse.jetty.websocket.common.Generator;
54  import org.eclipse.jetty.websocket.common.LogicalConnection;
55  import org.eclipse.jetty.websocket.common.Parser;
56  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57  
58  /**
59   * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
60   */
61  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable 
62  {
63      private class Flusher extends FrameFlusher
64      {
65          private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66          {
67              super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68          }
69  
70          @Override
71          protected void onFailure(Throwable x)
72          {
73              notifyError(x);
74  
75              if (ioState.wasAbnormalClose())
76              {
77                  LOG.ignore(x);
78                  return;
79              }
80  
81              if (LOG.isDebugEnabled())
82                  LOG.debug("Write flush failure",x);
83              ioState.onWriteFailure(x);
84          }
85      }
86  
87      public class OnDisconnectCallback implements WriteCallback
88      {
89          private final boolean outputOnly;
90  
91          public OnDisconnectCallback(boolean outputOnly)
92          {
93              this.outputOnly = outputOnly;
94          }
95  
96          @Override
97          public void writeFailed(Throwable x)
98          {
99              disconnect(outputOnly);
100         }
101 
102         @Override
103         public void writeSuccess()
104         {
105             disconnect(outputOnly);
106         }
107     }
108 
109     public class OnCloseLocalCallback implements WriteCallback
110     {
111         private final WriteCallback callback;
112         private final CloseInfo close;
113 
114         public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115         {
116             this.callback = callback;
117             this.close = close;
118         }
119 
120         public OnCloseLocalCallback(CloseInfo close)
121         {
122             this(null,close);
123         }
124 
125         @Override
126         public void writeFailed(Throwable x)
127         {
128             try
129             {
130                 if (callback != null)
131                 {
132                     callback.writeFailed(x);
133                 }
134             }
135             finally
136             {
137                 onLocalClose();
138             }
139         }
140 
141         @Override
142         public void writeSuccess()
143         {
144             try
145             {
146                 if (callback != null)
147                 {
148                     callback.writeSuccess();
149                 }
150             }
151             finally
152             {
153                 onLocalClose();
154             }
155         }
156 
157         private void onLocalClose()
158         {
159             if (LOG_CLOSE.isDebugEnabled())
160                 LOG_CLOSE.debug("Local Close Confirmed {}",close);
161             if (close.isAbnormal())
162             {
163                 ioState.onAbnormalClose(close);
164             }
165             else
166             {
167                 ioState.onCloseLocal(close);
168             }
169         }
170     }
171 
172     public static class Stats
173     {
174         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175         private AtomicLong countOnFillableEvents = new AtomicLong(0);
176         private AtomicLong countFillableErrors = new AtomicLong(0);
177 
178         public long getFillableErrorCount()
179         {
180             return countFillableErrors.get();
181         }
182 
183         public long getFillInterestedCount()
184         {
185             return countFillInterestedEvents.get();
186         }
187 
188         public long getOnFillableCount()
189         {
190             return countOnFillableEvents.get();
191         }
192     }
193     
194     private static enum ReadMode
195     {
196         PARSE,
197         DISCARD,
198         EOF
199     }
200 
201     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202     private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
203     private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
204 
205     /**
206      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
207      */
208     private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
209 
210     private final ByteBufferPool bufferPool;
211     private final Scheduler scheduler;
212     private final Generator generator;
213     private final Parser parser;
214     private final WebSocketPolicy policy;
215     private final AtomicBoolean suspendToken;
216     private final FrameFlusher flusher;
217     private List<ExtensionConfig> extensions;
218     private boolean isFilling;
219     private ByteBuffer prefillBuffer;
220     private ReadMode readMode = ReadMode.PARSE;
221     private IOState ioState;
222     private Stats stats = new Stats();
223 
224     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
225     {
226         super(endp,executor);
227         this.policy = policy;
228         this.bufferPool = bufferPool;
229         this.generator = new Generator(policy,bufferPool);
230         this.parser = new Parser(policy,bufferPool);
231         this.scheduler = scheduler;
232         this.extensions = new ArrayList<>();
233         this.suspendToken = new AtomicBoolean(false);
234         this.ioState = new IOState();
235         this.ioState.addListener(this);
236         this.flusher = new Flusher(bufferPool,generator,endp);
237         this.setInputBufferSize(policy.getInputBufferSize());
238         this.setMaxIdleTimeout(policy.getIdleTimeout());
239     }
240 
241     @Override
242     public Executor getExecutor()
243     {
244         return super.getExecutor();
245     }
246 
247     /**
248      * Close without a close code or reason
249      */
250     @Override
251     public void close()
252     {
253         if(LOG_CLOSE.isDebugEnabled())
254             LOG_CLOSE.debug(".close()");
255         CloseInfo close = new CloseInfo();
256         this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
257     }
258 
259     /**
260      * Close the connection.
261      * <p>                    fillInterested();
262 
263      * This can result in a close handshake over the network, or a simple local abnormal close
264      * 
265      * @param statusCode
266      *            the WebSocket status code.
267      * @param reason
268      *            the (optional) reason string. (null is allowed)
269      * @see StatusCode
270      */
271     @Override
272     public void close(int statusCode, String reason)
273     {
274         if (LOG_CLOSE.isDebugEnabled())
275             LOG_CLOSE.debug("close({},{})",statusCode,reason);
276         CloseInfo close = new CloseInfo(statusCode,reason);
277         this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
278     }
279 
280     @Override
281     public void disconnect()
282     {
283         if (LOG_CLOSE.isDebugEnabled())
284             LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
285         disconnect(false);
286     }
287 
288     private void disconnect(boolean onlyOutput)
289     {
290         if (LOG_CLOSE.isDebugEnabled())
291             LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
292         // close FrameFlusher, we cannot write anymore at this point.
293         flusher.close();
294         EndPoint endPoint = getEndPoint();
295         // We need to gently close first, to allow
296         // SSL close alerts to be sent by Jetty
297         if (LOG_CLOSE.isDebugEnabled())
298             LOG_CLOSE.debug("Shutting down output {}",endPoint);
299         endPoint.shutdownOutput();
300         if (!onlyOutput)
301         {
302             if (LOG_CLOSE.isDebugEnabled())
303                 LOG_CLOSE.debug("Closing {}",endPoint);
304             endPoint.close();
305         }
306     }
307 
308     protected void execute(Runnable task)
309     {
310         try
311         {
312             getExecutor().execute(task);
313         }
314         catch (RejectedExecutionException e)
315         {
316             if (LOG.isDebugEnabled())
317                 LOG.debug("Job not dispatched: {}",task);
318         }
319     }
320 
321     @Override
322     public void fillInterested()
323     {
324         stats.countFillInterestedEvents.incrementAndGet();
325         super.fillInterested();
326     }
327 
328     @Override
329     public ByteBufferPool getBufferPool()
330     {
331         return bufferPool;
332     }
333 
334     /**
335      * Get the list of extensions in use.
336      * <p>
337      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
338      * 
339      * @return the list of negotiated extensions in use.
340      */
341     public List<ExtensionConfig> getExtensions()
342     {
343         return extensions;
344     }
345 
346     public Generator getGenerator()
347     {
348         return generator;
349     }
350 
351     @Override
352     public long getIdleTimeout()
353     {
354         return getEndPoint().getIdleTimeout();
355     }
356 
357     @Override
358     public IOState getIOState()
359     {
360         return ioState;
361     }
362 
363     @Override
364     public long getMaxIdleTimeout()
365     {
366         return getEndPoint().getIdleTimeout();
367     }
368 
369     public Parser getParser()
370     {
371         return parser;
372     }
373 
374     @Override
375     public WebSocketPolicy getPolicy()
376     {
377         return this.policy;
378     }
379 
380     @Override
381     public InetSocketAddress getRemoteAddress()
382     {
383         return getEndPoint().getRemoteAddress();
384     }
385 
386     public Scheduler getScheduler()
387     {
388         return scheduler;
389     }
390 
391     public Stats getStats()
392     {
393         return stats;
394     }
395 
396     @Override
397     public boolean isOpen()
398     {
399         return getIOState().isOpen() && getEndPoint().isOpen();
400     }
401 
402     @Override
403     public boolean isReading()
404     {
405         return isFilling;
406     }
407 
408     /**
409      * Physical connection disconnect.
410      * <p>
411      * Not related to WebSocket close handshake.
412      */
413     @Override
414     public void onClose()
415     {
416         if (LOG.isDebugEnabled())
417             LOG.debug("{} onClose()",policy.getBehavior());
418         super.onClose();
419         ioState.onDisconnected();
420         flusher.close();
421     }
422 
423     @Override
424     public void onConnectionStateChange(ConnectionState state)
425     {
426         if (LOG_CLOSE.isDebugEnabled())
427             LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
428         
429         switch (state)
430         {
431             case OPEN:
432                 if (BufferUtil.hasContent(prefillBuffer))
433                 {
434                     if (LOG.isDebugEnabled())
435                     {
436                         LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
437                     }
438                     parser.parse(prefillBuffer);
439                 }
440                 if (LOG.isDebugEnabled())
441                 {
442                     LOG.debug("OPEN: normal fillInterested");
443                 }
444                 // TODO: investigate what happens if a failure occurs during prefill, and an attempt to write close fails,
445                 // should a fill interested occur? or just a quick disconnect?
446                 fillInterested();
447                 break;
448             case CLOSED:
449                 if (LOG_CLOSE.isDebugEnabled())
450                     LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
451                 if (ioState.wasAbnormalClose())
452                 {
453                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
454                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
455                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
456                 }
457                 else
458                 {
459                     // Just disconnect
460                     this.disconnect(false);
461                 }
462                 break;
463             case CLOSING:
464                 if (LOG_CLOSE.isDebugEnabled())
465                     LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
466                 // First occurrence of .onCloseLocal or .onCloseRemote use
467                 if (ioState.wasRemoteCloseInitiated())
468                 {
469                     CloseInfo close = ioState.getCloseInfo();
470                     // reply to close handshake from remote
471                     outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
472                 }
473             default:
474                 break;
475         }
476     }
477 
478     @Override
479     public void onFillable()
480     {
481         if (LOG.isDebugEnabled())
482             LOG.debug("{} onFillable()",policy.getBehavior());
483         stats.countOnFillableEvents.incrementAndGet();
484         
485         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
486         
487         try
488         {
489             isFilling = true;
490 
491             if(readMode == ReadMode.PARSE)
492             {
493                 readMode = readParse(buffer);
494             } 
495             else
496             {
497                 readMode = readDiscard(buffer);
498             }
499         }
500         finally
501         {
502             bufferPool.release(buffer);
503         }
504 
505         if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
506         {
507             fillInterested();
508         }
509         else
510         {
511             isFilling = false;
512         }
513     }
514 
515     
516 
517     @Override
518     protected void onFillInterestedFailed(Throwable cause)
519     {
520         LOG.ignore(cause);
521         stats.countFillInterestedEvents.incrementAndGet();
522         super.onFillInterestedFailed(cause);
523     }
524 
525     /**
526      * Extra bytes from the initial HTTP upgrade that need to
527      * be processed by the websocket parser before starting
528      * to read bytes from the connection
529      * @param prefilled the bytes of prefilled content encountered during upgrade
530      */
531     protected void setInitialBuffer(ByteBuffer prefilled)
532     {
533         if (LOG.isDebugEnabled())
534         {
535             LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
536         }
537         prefillBuffer = prefilled;
538     }
539     
540     private void notifyError(Throwable t)
541     {
542         getParser().getIncomingFramesHandler().incomingError(t);
543     }
544     
545     @Override
546     public void onOpen()
547     {
548         if(LOG_OPEN.isDebugEnabled())
549             LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
550         super.onOpen();
551         this.ioState.onOpened();
552     }
553 
554     /**
555      * Event for no activity on connection (read or write)
556      */
557     @Override
558     protected boolean onReadTimeout()
559     {
560         IOState state = getIOState();
561         ConnectionState cstate = state.getConnectionState();
562         if (LOG_CLOSE.isDebugEnabled())
563             LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
564 
565         if (cstate == ConnectionState.CLOSED)
566         {
567             if (LOG_CLOSE.isDebugEnabled())
568                 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
569             // close already completed, extra timeouts not relevant
570             // allow underlying connection and endpoint to disconnect on its own
571             return true;
572         }
573 
574         try
575         {
576             notifyError(new SocketTimeoutException("Timeout on Read"));
577         }
578         finally
579         {
580             // This is an Abnormal Close condition
581             close(StatusCode.SHUTDOWN,"Idle Timeout");
582         }
583 
584         return false;
585     }
586 
587     /**
588      * Frame from API, User, or Internal implementation destined for network.
589      */
590     @Override
591     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
592     {
593         if (LOG.isDebugEnabled())
594         {
595             LOG.debug("outgoingFrame({}, {})",frame,callback);
596         }
597 
598         flusher.enqueue(frame,callback,batchMode);
599     }
600 
601     private ReadMode readDiscard(ByteBuffer buffer)
602     {
603         EndPoint endPoint = getEndPoint();
604         try
605         {
606             while (true)
607             {
608                 int filled = endPoint.fill(buffer);
609                 if (filled == 0)
610                 {
611                     return ReadMode.DISCARD;
612                 }
613                 else if (filled < 0)
614                 {
615                     if (LOG_CLOSE.isDebugEnabled())
616                         LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
617                     return ReadMode.EOF;
618                 }
619                 else
620                 {
621                     if (LOG_CLOSE.isDebugEnabled())
622                         LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
623                 }
624             }
625         }
626         catch (IOException e)
627         {
628             LOG.ignore(e);
629             return ReadMode.EOF;
630         }
631         catch (Throwable t)
632         {
633             LOG.ignore(t);
634             return ReadMode.DISCARD;
635         }
636     }
637     
638     private ReadMode readParse(ByteBuffer buffer)
639     {
640         EndPoint endPoint = getEndPoint();
641         try
642         {
643             // Process the content from the Endpoint next
644             while(true)  // TODO: should this honor the LogicalConnection.suspend() ?
645             {
646                 int filled = endPoint.fill(buffer);
647                 if (filled < 0)
648                 {
649                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
650                     ioState.onReadFailure(new EOFException("Remote Read EOF"));
651                     return ReadMode.EOF;
652                 }
653                 else if (filled == 0)
654                 {
655                     // Done reading, wait for next onFillable
656                     return ReadMode.PARSE;
657                 }
658                 
659                 if (LOG.isDebugEnabled())
660                 {
661                     LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
662                 }
663                 parser.parse(buffer);
664             }
665         }
666         catch (IOException e)
667         {
668             LOG.warn(e);
669             close(StatusCode.PROTOCOL,e.getMessage());
670             return ReadMode.DISCARD;
671         }
672         catch (CloseException e)
673         {
674             LOG.debug(e);
675             close(e.getStatusCode(),e.getMessage());
676             return ReadMode.DISCARD;
677         }
678         catch (Throwable t)
679         {
680             LOG.warn(t);
681             close(StatusCode.ABNORMAL,t.getMessage());
682             // TODO: should probably only switch to discard if a non-ws-endpoint error
683             return ReadMode.DISCARD;
684         }
685     }
686     
687     @Override
688     public void resume()
689     {
690         if (suspendToken.getAndSet(false))
691         {
692             fillInterested();
693         }
694     }
695 
696     /**
697      * Get the list of extensions in use.
698      * <p>
699      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
700      * 
701      * @param extensions
702      *            the list of negotiated extensions in use.
703      */
704     public void setExtensions(List<ExtensionConfig> extensions)
705     {
706         this.extensions = extensions;
707     }
708 
709     @Override
710     public void setInputBufferSize(int inputBufferSize)
711     {
712         if (inputBufferSize < MIN_BUFFER_SIZE)
713         {
714             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
715         }
716         super.setInputBufferSize(inputBufferSize);
717     }
718 
719     @Override
720     public void setMaxIdleTimeout(long ms)
721     {
722         getEndPoint().setIdleTimeout(ms);
723     }
724 
725     @Override
726     public SuspendToken suspend()
727     {
728         suspendToken.set(true);
729         return this;
730     }
731 
732     @Override
733     public String dump()
734     {
735         return ContainerLifeCycle.dump(this);
736     }
737 
738     @Override
739     public void dump(Appendable out, String indent) throws IOException
740     {
741         out.append(toString()).append(System.lineSeparator());
742     }
743 
744     @Override
745     public String toString()
746     {
747         return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
748     }
749 
750     /**
751      * Extra bytes from the initial HTTP upgrade that need to
752      * be processed by the websocket parser before starting
753      * to read bytes from the connection
754      */
755     @Override
756     public void onUpgradeTo(ByteBuffer prefilled)
757     {
758         setInitialBuffer(prefilled);
759     }
760 }