View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.BatchMode;
44  import org.eclipse.jetty.websocket.api.CloseException;
45  import org.eclipse.jetty.websocket.api.StatusCode;
46  import org.eclipse.jetty.websocket.api.SuspendToken;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.WriteCallback;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50  import org.eclipse.jetty.websocket.api.extensions.Frame;
51  import org.eclipse.jetty.websocket.common.CloseInfo;
52  import org.eclipse.jetty.websocket.common.ConnectionState;
53  import org.eclipse.jetty.websocket.common.Generator;
54  import org.eclipse.jetty.websocket.common.LogicalConnection;
55  import org.eclipse.jetty.websocket.common.Parser;
56  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57  
58  /**
59   * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
60   */
61  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
62  {
63      private final AtomicBoolean closed = new AtomicBoolean();
64  
65      private class Flusher extends FrameFlusher
66      {
67          private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
68          {
69              super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
70          }
71  
72          @Override
73          protected void onFailure(Throwable x)
74          {
75              notifyError(x);
76  
77              if (ioState.wasAbnormalClose())
78              {
79                  LOG.ignore(x);
80                  return;
81              }
82  
83              if (LOG.isDebugEnabled())
84                  LOG.debug("Write flush failure",x);
85              ioState.onWriteFailure(x);
86          }
87      }
88  
89      public class OnDisconnectCallback implements WriteCallback
90      {
91          private final boolean outputOnly;
92  
93          public OnDisconnectCallback(boolean outputOnly)
94          {
95              this.outputOnly = outputOnly;
96          }
97  
98          @Override
99          public void writeFailed(Throwable x)
100         {
101             disconnect(outputOnly);
102         }
103 
104         @Override
105         public void writeSuccess()
106         {
107             disconnect(outputOnly);
108         }
109     }
110 
111     public class OnCloseLocalCallback implements WriteCallback
112     {
113         private final WriteCallback callback;
114         private final CloseInfo close;
115 
116         public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
117         {
118             this.callback = callback;
119             this.close = close;
120         }
121 
122         public OnCloseLocalCallback(CloseInfo close)
123         {
124             this(null,close);
125         }
126 
127         @Override
128         public void writeFailed(Throwable x)
129         {
130             try
131             {
132                 if (callback != null)
133                 {
134                     callback.writeFailed(x);
135                 }
136             }
137             finally
138             {
139                 onLocalClose();
140             }
141         }
142 
143         @Override
144         public void writeSuccess()
145         {
146             try
147             {
148                 if (callback != null)
149                 {
150                     callback.writeSuccess();
151                 }
152             }
153             finally
154             {
155                 onLocalClose();
156             }
157         }
158 
159         private void onLocalClose()
160         {
161             if (LOG_CLOSE.isDebugEnabled())
162                 LOG_CLOSE.debug("Local Close Confirmed {}",close);
163             if (close.isAbnormal())
164             {
165                 ioState.onAbnormalClose(close);
166             }
167             else
168             {
169                 ioState.onCloseLocal(close);
170             }
171         }
172     }
173 
174     public static class Stats
175     {
176         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
177         private AtomicLong countOnFillableEvents = new AtomicLong(0);
178         private AtomicLong countFillableErrors = new AtomicLong(0);
179 
180         public long getFillableErrorCount()
181         {
182             return countFillableErrors.get();
183         }
184 
185         public long getFillInterestedCount()
186         {
187             return countFillInterestedEvents.get();
188         }
189 
190         public long getOnFillableCount()
191         {
192             return countOnFillableEvents.get();
193         }
194     }
195 
196     private static enum ReadMode
197     {
198         PARSE,
199         DISCARD,
200         EOF
201     }
202 
203     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
204     private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
205     private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
206 
207     /**
208      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
209      */
210     private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
211 
212     private final ByteBufferPool bufferPool;
213     private final Scheduler scheduler;
214     private final Generator generator;
215     private final Parser parser;
216     private final WebSocketPolicy policy;
217     private final AtomicBoolean suspendToken;
218     private final FrameFlusher flusher;
219     private final String id;
220     private List<ExtensionConfig> extensions;
221     private boolean isFilling;
222     private ByteBuffer prefillBuffer;
223     private ReadMode readMode = ReadMode.PARSE;
224     private IOState ioState;
225     private Stats stats = new Stats();
226 
227     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
228     {
229         super(endp,executor);
230         this.id = String.format("%s:%d->%s:%d",
231                 endp.getLocalAddress().getAddress().getHostAddress(),
232                 endp.getLocalAddress().getPort(),
233                 endp.getRemoteAddress().getAddress().getHostAddress(),
234                 endp.getRemoteAddress().getPort());
235         this.policy = policy;
236         this.bufferPool = bufferPool;
237         this.generator = new Generator(policy,bufferPool);
238         this.parser = new Parser(policy,bufferPool);
239         this.scheduler = scheduler;
240         this.extensions = new ArrayList<>();
241         this.suspendToken = new AtomicBoolean(false);
242         this.ioState = new IOState();
243         this.ioState.addListener(this);
244         this.flusher = new Flusher(bufferPool,generator,endp);
245         this.setInputBufferSize(policy.getInputBufferSize());
246         this.setMaxIdleTimeout(policy.getIdleTimeout());
247     }
248 
249     @Override
250     public Executor getExecutor()
251     {
252         return super.getExecutor();
253     }
254 
255     /**
256      * Close without a close code or reason
257      */
258     @Override
259     public void close()
260     {
261         if (LOG_CLOSE.isDebugEnabled())
262             LOG_CLOSE.debug("close()");
263         close(new CloseInfo());
264     }
265 
266     /**
267      * Close the connection.
268      * <p>                    fillInterested();
269 
270      * This can result in a close handshake over the network, or a simple local abnormal close
271      *
272      * @param statusCode
273      *            the WebSocket status code.
274      * @param reason
275      *            the (optional) reason string. (null is allowed)
276      * @see StatusCode
277      */
278     @Override
279     public void close(int statusCode, String reason)
280     {
281         if (LOG_CLOSE.isDebugEnabled())
282             LOG_CLOSE.debug("close({},{})", statusCode, reason);
283         close(new CloseInfo(statusCode, reason));
284     }
285 
286     private void close(CloseInfo closeInfo)
287     {
288         if (closed.compareAndSet(false, true))
289             outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
290     }
291 
292     @Override
293     public void disconnect()
294     {
295         if (LOG_CLOSE.isDebugEnabled())
296             LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
297         disconnect(false);
298     }
299 
300     private void disconnect(boolean onlyOutput)
301     {
302         if (LOG_CLOSE.isDebugEnabled())
303             LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
304         // close FrameFlusher, we cannot write anymore at this point.
305         flusher.close();
306         EndPoint endPoint = getEndPoint();
307         // We need to gently close first, to allow
308         // SSL close alerts to be sent by Jetty
309         if (LOG_CLOSE.isDebugEnabled())
310             LOG_CLOSE.debug("Shutting down output {}",endPoint);
311         endPoint.shutdownOutput();
312         if (!onlyOutput)
313         {
314             if (LOG_CLOSE.isDebugEnabled())
315                 LOG_CLOSE.debug("Closing {}",endPoint);
316             endPoint.close();
317         }
318     }
319 
320     protected void execute(Runnable task)
321     {
322         try
323         {
324             getExecutor().execute(task);
325         }
326         catch (RejectedExecutionException e)
327         {
328             if (LOG.isDebugEnabled())
329                 LOG.debug("Job not dispatched: {}",task);
330         }
331     }
332 
333     @Override
334     public void fillInterested()
335     {
336         stats.countFillInterestedEvents.incrementAndGet();
337         super.fillInterested();
338     }
339 
340     @Override
341     public ByteBufferPool getBufferPool()
342     {
343         return bufferPool;
344     }
345 
346     /**
347      * Get the list of extensions in use.
348      * <p>
349      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
350      *
351      * @return the list of negotiated extensions in use.
352      */
353     public List<ExtensionConfig> getExtensions()
354     {
355         return extensions;
356     }
357 
358     public Generator getGenerator()
359     {
360         return generator;
361     }
362 
363     @Override
364     public String getId()
365     {
366         return id;
367     }
368 
369     @Override
370     public long getIdleTimeout()
371     {
372         return getEndPoint().getIdleTimeout();
373     }
374 
375     @Override
376     public IOState getIOState()
377     {
378         return ioState;
379     }
380 
381     @Override
382     public long getMaxIdleTimeout()
383     {
384         return getEndPoint().getIdleTimeout();
385     }
386 
387     public Parser getParser()
388     {
389         return parser;
390     }
391 
392     @Override
393     public WebSocketPolicy getPolicy()
394     {
395         return this.policy;
396     }
397 
398     @Override
399     public InetSocketAddress getRemoteAddress()
400     {
401         return getEndPoint().getRemoteAddress();
402     }
403 
404     public Scheduler getScheduler()
405     {
406         return scheduler;
407     }
408 
409     public Stats getStats()
410     {
411         return stats;
412     }
413 
414     @Override
415     public boolean isOpen()
416     {
417         return !closed.get();
418     }
419 
420     @Override
421     public boolean isReading()
422     {
423         return isFilling;
424     }
425 
426     /**
427      * Physical connection disconnect.
428      * <p>
429      * Not related to WebSocket close handshake.
430      */
431     @Override
432     public void onClose()
433     {
434         if (LOG.isDebugEnabled())
435             LOG.debug("{} onClose()",policy.getBehavior());
436         super.onClose();
437         ioState.onDisconnected();
438         flusher.close();
439     }
440 
441     @Override
442     public void onConnectionStateChange(ConnectionState state)
443     {
444         if (LOG_CLOSE.isDebugEnabled())
445             LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
446 
447         switch (state)
448         {
449             case OPEN:
450                 if (BufferUtil.hasContent(prefillBuffer))
451                 {
452                     if (LOG.isDebugEnabled())
453                     {
454                         LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
455                     }
456                     parser.parse(prefillBuffer);
457                 }
458                 if (LOG.isDebugEnabled())
459                 {
460                     LOG.debug("OPEN: normal fillInterested");
461                 }
462                 // TODO: investigate what happens if a failure occurs during prefill, and an attempt to write close fails,
463                 // should a fill interested occur? or just a quick disconnect?
464                 fillInterested();
465                 break;
466             case CLOSED:
467                 if (LOG_CLOSE.isDebugEnabled())
468                     LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
469                 if (ioState.wasAbnormalClose())
470                 {
471                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
472                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
473                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
474                 }
475                 else
476                 {
477                     // Just disconnect
478                     this.disconnect(false);
479                 }
480                 break;
481             case CLOSING:
482                 if (LOG_CLOSE.isDebugEnabled())
483                     LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
484                 // First occurrence of .onCloseLocal or .onCloseRemote use
485                 if (ioState.wasRemoteCloseInitiated())
486                 {
487                     CloseInfo close = ioState.getCloseInfo();
488                     // reply to close handshake from remote
489                     outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
490                 }
491             default:
492                 break;
493         }
494     }
495 
496     @Override
497     public void onFillable()
498     {
499         if (LOG.isDebugEnabled())
500             LOG.debug("{} onFillable()",policy.getBehavior());
501         stats.countOnFillableEvents.incrementAndGet();
502 
503         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
504 
505         try
506         {
507             isFilling = true;
508 
509             if(readMode == ReadMode.PARSE)
510             {
511                 readMode = readParse(buffer);
512             }
513             else
514             {
515                 readMode = readDiscard(buffer);
516             }
517         }
518         finally
519         {
520             bufferPool.release(buffer);
521         }
522 
523         if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
524         {
525             fillInterested();
526         }
527         else
528         {
529             isFilling = false;
530         }
531     }
532 
533     @Override
534     protected void onFillInterestedFailed(Throwable cause)
535     {
536         LOG.ignore(cause);
537         stats.countFillInterestedEvents.incrementAndGet();
538         super.onFillInterestedFailed(cause);
539     }
540 
541     /**
542      * Extra bytes from the initial HTTP upgrade that need to
543      * be processed by the websocket parser before starting
544      * to read bytes from the connection
545      * @param prefilled the bytes of prefilled content encountered during upgrade
546      */
547     protected void setInitialBuffer(ByteBuffer prefilled)
548     {
549         if (LOG.isDebugEnabled())
550         {
551             LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
552         }
553         prefillBuffer = prefilled;
554     }
555 
556     private void notifyError(Throwable t)
557     {
558         getParser().getIncomingFramesHandler().incomingError(t);
559     }
560 
561     @Override
562     public void onOpen()
563     {
564         if(LOG_OPEN.isDebugEnabled())
565             LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
566         super.onOpen();
567         this.ioState.onOpened();
568     }
569 
570     /**
571      * Event for no activity on connection (read or write)
572      */
573     @Override
574     protected boolean onReadTimeout()
575     {
576         IOState state = getIOState();
577         ConnectionState cstate = state.getConnectionState();
578         if (LOG_CLOSE.isDebugEnabled())
579             LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
580 
581         if (cstate == ConnectionState.CLOSED)
582         {
583             if (LOG_CLOSE.isDebugEnabled())
584                 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
585             // close already completed, extra timeouts not relevant
586             // allow underlying connection and endpoint to disconnect on its own
587             return true;
588         }
589 
590         try
591         {
592             notifyError(new SocketTimeoutException("Timeout on Read"));
593         }
594         finally
595         {
596             // This is an Abnormal Close condition
597             close(StatusCode.SHUTDOWN,"Idle Timeout");
598         }
599 
600         return false;
601     }
602 
603     /**
604      * Frame from API, User, or Internal implementation destined for network.
605      */
606     @Override
607     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
608     {
609         if (LOG.isDebugEnabled())
610         {
611             LOG.debug("outgoingFrame({}, {})",frame,callback);
612         }
613 
614         flusher.enqueue(frame,callback,batchMode);
615     }
616 
617     private ReadMode readDiscard(ByteBuffer buffer)
618     {
619         EndPoint endPoint = getEndPoint();
620         try
621         {
622             while (true)
623             {
624                 int filled = endPoint.fill(buffer);
625                 if (filled == 0)
626                 {
627                     return ReadMode.DISCARD;
628                 }
629                 else if (filled < 0)
630                 {
631                     if (LOG_CLOSE.isDebugEnabled())
632                         LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
633                     return ReadMode.EOF;
634                 }
635                 else
636                 {
637                     if (LOG_CLOSE.isDebugEnabled())
638                         LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
639                 }
640             }
641         }
642         catch (IOException e)
643         {
644             LOG.ignore(e);
645             return ReadMode.EOF;
646         }
647         catch (Throwable t)
648         {
649             LOG.ignore(t);
650             return ReadMode.DISCARD;
651         }
652     }
653 
654     private ReadMode readParse(ByteBuffer buffer)
655     {
656         EndPoint endPoint = getEndPoint();
657         try
658         {
659             // Process the content from the Endpoint next
660             while(true)  // TODO: should this honor the LogicalConnection.suspend() ?
661             {
662                 int filled = endPoint.fill(buffer);
663                 if (filled < 0)
664                 {
665                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
666                     ioState.onReadFailure(new EOFException("Remote Read EOF"));
667                     return ReadMode.EOF;
668                 }
669                 else if (filled == 0)
670                 {
671                     // Done reading, wait for next onFillable
672                     return ReadMode.PARSE;
673                 }
674 
675                 if (LOG.isDebugEnabled())
676                 {
677                     LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
678                 }
679                 parser.parse(buffer);
680             }
681         }
682         catch (IOException e)
683         {
684             LOG.warn(e);
685             close(StatusCode.PROTOCOL,e.getMessage());
686             return ReadMode.DISCARD;
687         }
688         catch (CloseException e)
689         {
690             LOG.debug(e);
691             close(e.getStatusCode(),e.getMessage());
692             return ReadMode.DISCARD;
693         }
694         catch (Throwable t)
695         {
696             LOG.warn(t);
697             close(StatusCode.ABNORMAL,t.getMessage());
698             // TODO: should probably only switch to discard if a non-ws-endpoint error
699             return ReadMode.DISCARD;
700         }
701     }
702 
703     @Override
704     public void resume()
705     {
706         if (suspendToken.getAndSet(false))
707         {
708             fillInterested();
709         }
710     }
711 
712     /**
713      * Get the list of extensions in use.
714      * <p>
715      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
716      *
717      * @param extensions
718      *            the list of negotiated extensions in use.
719      */
720     public void setExtensions(List<ExtensionConfig> extensions)
721     {
722         this.extensions = extensions;
723     }
724 
725     @Override
726     public void setInputBufferSize(int inputBufferSize)
727     {
728         if (inputBufferSize < MIN_BUFFER_SIZE)
729         {
730             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
731         }
732         super.setInputBufferSize(inputBufferSize);
733     }
734 
735     @Override
736     public void setMaxIdleTimeout(long ms)
737     {
738         getEndPoint().setIdleTimeout(ms);
739     }
740 
741     @Override
742     public SuspendToken suspend()
743     {
744         suspendToken.set(true);
745         return this;
746     }
747 
748     @Override
749     public String dump()
750     {
751         return ContainerLifeCycle.dump(this);
752     }
753 
754     @Override
755     public void dump(Appendable out, String indent) throws IOException
756     {
757         out.append(toString()).append(System.lineSeparator());
758     }
759 
760     @Override
761     public String toString()
762     {
763         return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
764     }
765 
766     @Override
767     public int hashCode()
768     {
769         final int prime = 31;
770         int result = 1;
771 
772         EndPoint endp = getEndPoint();
773         if(endp != null)
774         {
775             result = prime * result + endp.getLocalAddress().hashCode();
776             result = prime * result + endp.getRemoteAddress().hashCode();
777         }
778         return result;
779     }
780 
781     @Override
782     public boolean equals(Object obj)
783     {
784         if (this == obj)
785             return true;
786         if (obj == null)
787             return false;
788         if (getClass() != obj.getClass())
789             return false;
790         AbstractWebSocketConnection other = (AbstractWebSocketConnection)obj;
791         EndPoint endp = getEndPoint();
792         EndPoint otherEndp = other.getEndPoint();
793         if (endp == null)
794         {
795             if (otherEndp != null)
796                 return false;
797         }
798         else if (!endp.equals(otherEndp))
799             return false;
800         return true;
801     }
802 
803     /**
804      * Extra bytes from the initial HTTP upgrade that need to
805      * be processed by the websocket parser before starting
806      * to read bytes from the connection
807      */
808     @Override
809     public void onUpgradeTo(ByteBuffer prefilled)
810     {
811         setInitialBuffer(prefilled);
812     }
813 }