View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.BatchMode;
44  import org.eclipse.jetty.websocket.api.CloseException;
45  import org.eclipse.jetty.websocket.api.StatusCode;
46  import org.eclipse.jetty.websocket.api.SuspendToken;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.WriteCallback;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50  import org.eclipse.jetty.websocket.api.extensions.Frame;
51  import org.eclipse.jetty.websocket.common.CloseInfo;
52  import org.eclipse.jetty.websocket.common.ConnectionState;
53  import org.eclipse.jetty.websocket.common.Generator;
54  import org.eclipse.jetty.websocket.common.LogicalConnection;
55  import org.eclipse.jetty.websocket.common.Parser;
56  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57  
58  /**
59   * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
60   */
61  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable 
62  {
63      private class Flusher extends FrameFlusher
64      {
65          private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66          {
67              super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68          }
69  
70          @Override
71          protected void onFailure(Throwable x)
72          {
73              notifyError(x);
74  
75              if (ioState.wasAbnormalClose())
76              {
77                  LOG.ignore(x);
78                  return;
79              }
80  
81              if (LOG.isDebugEnabled())
82                  LOG.debug("Write flush failure",x);
83              ioState.onWriteFailure(x);
84          }
85      }
86  
87      public class OnDisconnectCallback implements WriteCallback
88      {
89          private final boolean outputOnly;
90  
91          public OnDisconnectCallback(boolean outputOnly)
92          {
93              this.outputOnly = outputOnly;
94          }
95  
96          @Override
97          public void writeFailed(Throwable x)
98          {
99              disconnect(outputOnly);
100         }
101 
102         @Override
103         public void writeSuccess()
104         {
105             disconnect(outputOnly);
106         }
107     }
108 
109     public class OnCloseLocalCallback implements WriteCallback
110     {
111         private final WriteCallback callback;
112         private final CloseInfo close;
113 
114         public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115         {
116             this.callback = callback;
117             this.close = close;
118         }
119 
120         public OnCloseLocalCallback(CloseInfo close)
121         {
122             this(null,close);
123         }
124 
125         @Override
126         public void writeFailed(Throwable x)
127         {
128             try
129             {
130                 if (callback != null)
131                 {
132                     callback.writeFailed(x);
133                 }
134             }
135             finally
136             {
137                 onLocalClose();
138             }
139         }
140 
141         @Override
142         public void writeSuccess()
143         {
144             try
145             {
146                 if (callback != null)
147                 {
148                     callback.writeSuccess();
149                 }
150             }
151             finally
152             {
153                 onLocalClose();
154             }
155         }
156 
157         private void onLocalClose()
158         {
159             if (LOG_CLOSE.isDebugEnabled())
160                 LOG_CLOSE.debug("Local Close Confirmed {}",close);
161             if (close.isAbnormal())
162             {
163                 ioState.onAbnormalClose(close);
164             }
165             else
166             {
167                 ioState.onCloseLocal(close);
168             }
169         }
170     }
171 
172     public static class Stats
173     {
174         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175         private AtomicLong countOnFillableEvents = new AtomicLong(0);
176         private AtomicLong countFillableErrors = new AtomicLong(0);
177 
178         public long getFillableErrorCount()
179         {
180             return countFillableErrors.get();
181         }
182 
183         public long getFillInterestedCount()
184         {
185             return countFillInterestedEvents.get();
186         }
187 
188         public long getOnFillableCount()
189         {
190             return countOnFillableEvents.get();
191         }
192     }
193     
194     private static enum ReadMode
195     {
196         PARSE,
197         DISCARD,
198         EOF
199     }
200 
201     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202     private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
203     private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
204 
205     /**
206      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
207      */
208     private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
209 
210     private final ByteBufferPool bufferPool;
211     private final Scheduler scheduler;
212     private final Generator generator;
213     private final Parser parser;
214     private final WebSocketPolicy policy;
215     private final AtomicBoolean suspendToken;
216     private final FrameFlusher flusher;
217     private final String id;
218     private List<ExtensionConfig> extensions;
219     private boolean isFilling;
220     private ByteBuffer prefillBuffer;
221     private ReadMode readMode = ReadMode.PARSE;
222     private IOState ioState;
223     private Stats stats = new Stats();
224 
225     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
226     {
227         super(endp,executor);
228         this.id = String.format("%s:%d->%s:%d",
229                 endp.getLocalAddress().getAddress().getHostAddress(),
230                 endp.getLocalAddress().getPort(),
231                 endp.getRemoteAddress().getAddress().getHostAddress(),
232                 endp.getRemoteAddress().getPort());
233         this.policy = policy;
234         this.bufferPool = bufferPool;
235         this.generator = new Generator(policy,bufferPool);
236         this.parser = new Parser(policy,bufferPool);
237         this.scheduler = scheduler;
238         this.extensions = new ArrayList<>();
239         this.suspendToken = new AtomicBoolean(false);
240         this.ioState = new IOState();
241         this.ioState.addListener(this);
242         this.flusher = new Flusher(bufferPool,generator,endp);
243         this.setInputBufferSize(policy.getInputBufferSize());
244         this.setMaxIdleTimeout(policy.getIdleTimeout());
245     }
246 
247     @Override
248     public Executor getExecutor()
249     {
250         return super.getExecutor();
251     }
252 
253     /**
254      * Close without a close code or reason
255      */
256     @Override
257     public void close()
258     {
259         if(LOG_CLOSE.isDebugEnabled())
260             LOG_CLOSE.debug(".close()");
261         CloseInfo close = new CloseInfo();
262         this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
263     }
264 
265     /**
266      * Close the connection.
267      * <p>                    fillInterested();
268 
269      * This can result in a close handshake over the network, or a simple local abnormal close
270      * 
271      * @param statusCode
272      *            the WebSocket status code.
273      * @param reason
274      *            the (optional) reason string. (null is allowed)
275      * @see StatusCode
276      */
277     @Override
278     public void close(int statusCode, String reason)
279     {
280         if (LOG_CLOSE.isDebugEnabled())
281             LOG_CLOSE.debug("close({},{})",statusCode,reason);
282         CloseInfo close = new CloseInfo(statusCode,reason);
283         this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
284     }
285 
286     @Override
287     public void disconnect()
288     {
289         if (LOG_CLOSE.isDebugEnabled())
290             LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
291         disconnect(false);
292     }
293 
294     private void disconnect(boolean onlyOutput)
295     {
296         if (LOG_CLOSE.isDebugEnabled())
297             LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
298         // close FrameFlusher, we cannot write anymore at this point.
299         flusher.close();
300         EndPoint endPoint = getEndPoint();
301         // We need to gently close first, to allow
302         // SSL close alerts to be sent by Jetty
303         if (LOG_CLOSE.isDebugEnabled())
304             LOG_CLOSE.debug("Shutting down output {}",endPoint);
305         endPoint.shutdownOutput();
306         if (!onlyOutput)
307         {
308             if (LOG_CLOSE.isDebugEnabled())
309                 LOG_CLOSE.debug("Closing {}",endPoint);
310             endPoint.close();
311         }
312     }
313 
314     protected void execute(Runnable task)
315     {
316         try
317         {
318             getExecutor().execute(task);
319         }
320         catch (RejectedExecutionException e)
321         {
322             if (LOG.isDebugEnabled())
323                 LOG.debug("Job not dispatched: {}",task);
324         }
325     }
326 
327     @Override
328     public void fillInterested()
329     {
330         stats.countFillInterestedEvents.incrementAndGet();
331         super.fillInterested();
332     }
333 
334     @Override
335     public ByteBufferPool getBufferPool()
336     {
337         return bufferPool;
338     }
339 
340     /**
341      * Get the list of extensions in use.
342      * <p>
343      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
344      * 
345      * @return the list of negotiated extensions in use.
346      */
347     public List<ExtensionConfig> getExtensions()
348     {
349         return extensions;
350     }
351 
352     public Generator getGenerator()
353     {
354         return generator;
355     }
356     
357     @Override
358     public String getId()
359     {
360         return id;
361     }
362 
363     @Override
364     public long getIdleTimeout()
365     {
366         return getEndPoint().getIdleTimeout();
367     }
368 
369     @Override
370     public IOState getIOState()
371     {
372         return ioState;
373     }
374 
375     @Override
376     public long getMaxIdleTimeout()
377     {
378         return getEndPoint().getIdleTimeout();
379     }
380 
381     public Parser getParser()
382     {
383         return parser;
384     }
385 
386     @Override
387     public WebSocketPolicy getPolicy()
388     {
389         return this.policy;
390     }
391 
392     @Override
393     public InetSocketAddress getRemoteAddress()
394     {
395         return getEndPoint().getRemoteAddress();
396     }
397 
398     public Scheduler getScheduler()
399     {
400         return scheduler;
401     }
402 
403     public Stats getStats()
404     {
405         return stats;
406     }
407 
408     @Override
409     public boolean isOpen()
410     {
411         return getIOState().isOpen() && getEndPoint().isOpen();
412     }
413 
414     @Override
415     public boolean isReading()
416     {
417         return isFilling;
418     }
419 
420     /**
421      * Physical connection disconnect.
422      * <p>
423      * Not related to WebSocket close handshake.
424      */
425     @Override
426     public void onClose()
427     {
428         if (LOG.isDebugEnabled())
429             LOG.debug("{} onClose()",policy.getBehavior());
430         super.onClose();
431         ioState.onDisconnected();
432         flusher.close();
433     }
434 
435     @Override
436     public void onConnectionStateChange(ConnectionState state)
437     {
438         if (LOG_CLOSE.isDebugEnabled())
439             LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
440         
441         switch (state)
442         {
443             case OPEN:
444                 if (BufferUtil.hasContent(prefillBuffer))
445                 {
446                     if (LOG.isDebugEnabled())
447                     {
448                         LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
449                     }
450                     parser.parse(prefillBuffer);
451                 }
452                 if (LOG.isDebugEnabled())
453                 {
454                     LOG.debug("OPEN: normal fillInterested");
455                 }
456                 // TODO: investigate what happens if a failure occurs during prefill, and an attempt to write close fails,
457                 // should a fill interested occur? or just a quick disconnect?
458                 fillInterested();
459                 break;
460             case CLOSED:
461                 if (LOG_CLOSE.isDebugEnabled())
462                     LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
463                 if (ioState.wasAbnormalClose())
464                 {
465                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
466                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
467                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
468                 }
469                 else
470                 {
471                     // Just disconnect
472                     this.disconnect(false);
473                 }
474                 break;
475             case CLOSING:
476                 if (LOG_CLOSE.isDebugEnabled())
477                     LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
478                 // First occurrence of .onCloseLocal or .onCloseRemote use
479                 if (ioState.wasRemoteCloseInitiated())
480                 {
481                     CloseInfo close = ioState.getCloseInfo();
482                     // reply to close handshake from remote
483                     outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
484                 }
485             default:
486                 break;
487         }
488     }
489 
490     @Override
491     public void onFillable()
492     {
493         if (LOG.isDebugEnabled())
494             LOG.debug("{} onFillable()",policy.getBehavior());
495         stats.countOnFillableEvents.incrementAndGet();
496         
497         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
498         
499         try
500         {
501             isFilling = true;
502 
503             if(readMode == ReadMode.PARSE)
504             {
505                 readMode = readParse(buffer);
506             } 
507             else
508             {
509                 readMode = readDiscard(buffer);
510             }
511         }
512         finally
513         {
514             bufferPool.release(buffer);
515         }
516 
517         if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
518         {
519             fillInterested();
520         }
521         else
522         {
523             isFilling = false;
524         }
525     }
526 
527     
528 
529     @Override
530     protected void onFillInterestedFailed(Throwable cause)
531     {
532         LOG.ignore(cause);
533         stats.countFillInterestedEvents.incrementAndGet();
534         super.onFillInterestedFailed(cause);
535     }
536 
537     /**
538      * Extra bytes from the initial HTTP upgrade that need to
539      * be processed by the websocket parser before starting
540      * to read bytes from the connection
541      * @param prefilled the bytes of prefilled content encountered during upgrade
542      */
543     protected void setInitialBuffer(ByteBuffer prefilled)
544     {
545         if (LOG.isDebugEnabled())
546         {
547             LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
548         }
549         prefillBuffer = prefilled;
550     }
551     
552     private void notifyError(Throwable t)
553     {
554         getParser().getIncomingFramesHandler().incomingError(t);
555     }
556     
557     @Override
558     public void onOpen()
559     {
560         if(LOG_OPEN.isDebugEnabled())
561             LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
562         super.onOpen();
563         this.ioState.onOpened();
564     }
565 
566     /**
567      * Event for no activity on connection (read or write)
568      */
569     @Override
570     protected boolean onReadTimeout()
571     {
572         IOState state = getIOState();
573         ConnectionState cstate = state.getConnectionState();
574         if (LOG_CLOSE.isDebugEnabled())
575             LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
576 
577         if (cstate == ConnectionState.CLOSED)
578         {
579             if (LOG_CLOSE.isDebugEnabled())
580                 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
581             // close already completed, extra timeouts not relevant
582             // allow underlying connection and endpoint to disconnect on its own
583             return true;
584         }
585 
586         try
587         {
588             notifyError(new SocketTimeoutException("Timeout on Read"));
589         }
590         finally
591         {
592             // This is an Abnormal Close condition
593             close(StatusCode.SHUTDOWN,"Idle Timeout");
594         }
595 
596         return false;
597     }
598 
599     /**
600      * Frame from API, User, or Internal implementation destined for network.
601      */
602     @Override
603     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
604     {
605         if (LOG.isDebugEnabled())
606         {
607             LOG.debug("outgoingFrame({}, {})",frame,callback);
608         }
609 
610         flusher.enqueue(frame,callback,batchMode);
611     }
612 
613     private ReadMode readDiscard(ByteBuffer buffer)
614     {
615         EndPoint endPoint = getEndPoint();
616         try
617         {
618             while (true)
619             {
620                 int filled = endPoint.fill(buffer);
621                 if (filled == 0)
622                 {
623                     return ReadMode.DISCARD;
624                 }
625                 else if (filled < 0)
626                 {
627                     if (LOG_CLOSE.isDebugEnabled())
628                         LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
629                     return ReadMode.EOF;
630                 }
631                 else
632                 {
633                     if (LOG_CLOSE.isDebugEnabled())
634                         LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
635                 }
636             }
637         }
638         catch (IOException e)
639         {
640             LOG.ignore(e);
641             return ReadMode.EOF;
642         }
643         catch (Throwable t)
644         {
645             LOG.ignore(t);
646             return ReadMode.DISCARD;
647         }
648     }
649     
650     private ReadMode readParse(ByteBuffer buffer)
651     {
652         EndPoint endPoint = getEndPoint();
653         try
654         {
655             // Process the content from the Endpoint next
656             while(true)  // TODO: should this honor the LogicalConnection.suspend() ?
657             {
658                 int filled = endPoint.fill(buffer);
659                 if (filled < 0)
660                 {
661                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
662                     ioState.onReadFailure(new EOFException("Remote Read EOF"));
663                     return ReadMode.EOF;
664                 }
665                 else if (filled == 0)
666                 {
667                     // Done reading, wait for next onFillable
668                     return ReadMode.PARSE;
669                 }
670                 
671                 if (LOG.isDebugEnabled())
672                 {
673                     LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
674                 }
675                 parser.parse(buffer);
676             }
677         }
678         catch (IOException e)
679         {
680             LOG.warn(e);
681             close(StatusCode.PROTOCOL,e.getMessage());
682             return ReadMode.DISCARD;
683         }
684         catch (CloseException e)
685         {
686             LOG.debug(e);
687             close(e.getStatusCode(),e.getMessage());
688             return ReadMode.DISCARD;
689         }
690         catch (Throwable t)
691         {
692             LOG.warn(t);
693             close(StatusCode.ABNORMAL,t.getMessage());
694             // TODO: should probably only switch to discard if a non-ws-endpoint error
695             return ReadMode.DISCARD;
696         }
697     }
698     
699     @Override
700     public void resume()
701     {
702         if (suspendToken.getAndSet(false))
703         {
704             fillInterested();
705         }
706     }
707 
708     /**
709      * Get the list of extensions in use.
710      * <p>
711      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
712      * 
713      * @param extensions
714      *            the list of negotiated extensions in use.
715      */
716     public void setExtensions(List<ExtensionConfig> extensions)
717     {
718         this.extensions = extensions;
719     }
720 
721     @Override
722     public void setInputBufferSize(int inputBufferSize)
723     {
724         if (inputBufferSize < MIN_BUFFER_SIZE)
725         {
726             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
727         }
728         super.setInputBufferSize(inputBufferSize);
729     }
730 
731     @Override
732     public void setMaxIdleTimeout(long ms)
733     {
734         getEndPoint().setIdleTimeout(ms);
735     }
736 
737     @Override
738     public SuspendToken suspend()
739     {
740         suspendToken.set(true);
741         return this;
742     }
743 
744     @Override
745     public String dump()
746     {
747         return ContainerLifeCycle.dump(this);
748     }
749 
750     @Override
751     public void dump(Appendable out, String indent) throws IOException
752     {
753         out.append(toString()).append(System.lineSeparator());
754     }
755 
756     @Override
757     public String toString()
758     {
759         return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
760     }
761 
762     @Override
763     public int hashCode()
764     {
765         final int prime = 31;
766         int result = 1;
767         
768         EndPoint endp = getEndPoint();
769         if(endp != null)
770         {
771             result = prime * result + endp.getLocalAddress().hashCode();
772             result = prime * result + endp.getRemoteAddress().hashCode();
773         }
774         return result;
775     }
776 
777     @Override
778     public boolean equals(Object obj)
779     {
780         if (this == obj)
781             return true;
782         if (obj == null)
783             return false;
784         if (getClass() != obj.getClass())
785             return false;
786         AbstractWebSocketConnection other = (AbstractWebSocketConnection)obj;
787         EndPoint endp = getEndPoint();
788         EndPoint otherEndp = other.getEndPoint();
789         if (endp == null)
790         {
791             if (otherEndp != null)
792                 return false;
793         }
794         else if (!endp.equals(otherEndp))
795             return false;
796         return true;
797     }
798 
799     /**
800      * Extra bytes from the initial HTTP upgrade that need to
801      * be processed by the websocket parser before starting
802      * to read bytes from the connection
803      */
804     @Override
805     public void onUpgradeTo(ByteBuffer prefilled)
806     {
807         setInitialBuffer(prefilled);
808     }
809 }