View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.component.ContainerLifeCycle;
38  import org.eclipse.jetty.util.component.Dumpable;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  import org.eclipse.jetty.util.thread.Scheduler;
42  import org.eclipse.jetty.websocket.api.BatchMode;
43  import org.eclipse.jetty.websocket.api.CloseException;
44  import org.eclipse.jetty.websocket.api.StatusCode;
45  import org.eclipse.jetty.websocket.api.SuspendToken;
46  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
47  import org.eclipse.jetty.websocket.api.WriteCallback;
48  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49  import org.eclipse.jetty.websocket.api.extensions.Frame;
50  import org.eclipse.jetty.websocket.common.CloseInfo;
51  import org.eclipse.jetty.websocket.common.ConnectionState;
52  import org.eclipse.jetty.websocket.common.Generator;
53  import org.eclipse.jetty.websocket.common.LogicalConnection;
54  import org.eclipse.jetty.websocket.common.Parser;
55  import org.eclipse.jetty.websocket.common.WebSocketSession;
56  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57  
58  /**
59   * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of {@code jetty-io}.
60   */
61  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
62  {
63      private class Flusher extends FrameFlusher
64      {
65          private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66          {
67              super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68          }
69  
70          @Override
71          protected void onFailure(Throwable x)
72          {
73              session.notifyError(x);
74  
75              if (ioState.wasAbnormalClose())
76              {
77                  LOG.ignore(x);
78                  return;
79              }
80  
81              if (LOG.isDebugEnabled())
82                  LOG.debug("Write flush failure",x);
83              ioState.onWriteFailure(x);
84          }
85      }
86  
87      public class OnDisconnectCallback implements WriteCallback
88      {
89          private final boolean outputOnly;
90  
91          public OnDisconnectCallback(boolean outputOnly)
92          {
93              this.outputOnly = outputOnly;
94          }
95  
96          @Override
97          public void writeFailed(Throwable x)
98          {
99              disconnect(outputOnly);
100         }
101 
102         @Override
103         public void writeSuccess()
104         {
105             disconnect(outputOnly);
106         }
107     }
108 
109     public class OnCloseLocalCallback implements WriteCallback
110     {
111         private final WriteCallback callback;
112         private final CloseInfo close;
113 
114         public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115         {
116             this.callback = callback;
117             this.close = close;
118         }
119 
120         public OnCloseLocalCallback(CloseInfo close)
121         {
122             this(null,close);
123         }
124 
125         @Override
126         public void writeFailed(Throwable x)
127         {
128             try
129             {
130                 if (callback != null)
131                 {
132                     callback.writeFailed(x);
133                 }
134             }
135             finally
136             {
137                 onLocalClose();
138             }
139         }
140 
141         @Override
142         public void writeSuccess()
143         {
144             try
145             {
146                 if (callback != null)
147                 {
148                     callback.writeSuccess();
149                 }
150             }
151             finally
152             {
153                 onLocalClose();
154             }
155         }
156 
157         private void onLocalClose()
158         {
159             if (LOG.isDebugEnabled())
160                 LOG.debug("Local Close Confirmed {}",close);
161             if (close.isAbnormal())
162             {
163                 ioState.onAbnormalClose(close);
164             }
165             else
166             {
167                 ioState.onCloseLocal(close);
168             }
169         }
170     }
171 
172     public static class Stats
173     {
174         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175         private AtomicLong countOnFillableEvents = new AtomicLong(0);
176         private AtomicLong countFillableErrors = new AtomicLong(0);
177 
178         public long getFillableErrorCount()
179         {
180             return countFillableErrors.get();
181         }
182 
183         public long getFillInterestedCount()
184         {
185             return countFillInterestedEvents.get();
186         }
187 
188         public long getOnFillableCount()
189         {
190             return countOnFillableEvents.get();
191         }
192     }
193     
194     private static enum ReadMode
195     {
196         PARSE,
197         DISCARD,
198         EOF
199     }
200 
201     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202 
203     /**
204      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
205      */
206     private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
207 
208     private final ByteBufferPool bufferPool;
209     private final Scheduler scheduler;
210     private final Generator generator;
211     private final Parser parser;
212     private final WebSocketPolicy policy;
213     private final AtomicBoolean suspendToken;
214     private final FrameFlusher flusher;
215     private WebSocketSession session;
216     private List<ExtensionConfig> extensions;
217     private boolean isFilling;
218     private ReadMode readMode = ReadMode.PARSE;
219     private IOState ioState;
220     private Stats stats = new Stats();
221 
222     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
223     {
224         super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
225         this.policy = policy;
226         this.bufferPool = bufferPool;
227         this.generator = new Generator(policy,bufferPool);
228         this.parser = new Parser(policy,bufferPool);
229         this.scheduler = scheduler;
230         this.extensions = new ArrayList<>();
231         this.suspendToken = new AtomicBoolean(false);
232         this.ioState = new IOState();
233         this.ioState.addListener(this);
234         this.flusher = new Flusher(bufferPool,generator,endp);
235         this.setInputBufferSize(policy.getInputBufferSize());
236         this.setMaxIdleTimeout(policy.getIdleTimeout());
237     }
238 
239     @Override
240     public Executor getExecutor()
241     {
242         return super.getExecutor();
243     }
244 
245     @Override
246     public void close()
247     {
248         close(StatusCode.NORMAL,null);
249     }
250 
251     /**
252      * Close the connection.
253      * <p>
254      * This can result in a close handshake over the network, or a simple local abnormal close
255      * 
256      * @param statusCode
257      *            the WebSocket status code.
258      * @param reason
259      *            the (optional) reason string. (null is allowed)
260      * @see StatusCode
261      */
262     @Override
263     public void close(int statusCode, String reason)
264     {
265         if (LOG.isDebugEnabled())
266             LOG.debug("close({},{})",statusCode,reason);
267         CloseInfo close = new CloseInfo(statusCode,reason);
268         this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
269     }
270 
271     @Override
272     public void disconnect()
273     {
274         disconnect(false);
275     }
276 
277     private void disconnect(boolean onlyOutput)
278     {
279         if (LOG.isDebugEnabled())
280             LOG.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
281         // close FrameFlusher, we cannot write anymore at this point.
282         flusher.close();
283         EndPoint endPoint = getEndPoint();
284         // We need to gently close first, to allow
285         // SSL close alerts to be sent by Jetty
286         if (LOG.isDebugEnabled())
287             LOG.debug("Shutting down output {}",endPoint);
288         endPoint.shutdownOutput();
289         if (!onlyOutput)
290         {
291             LOG.debug("Closing {}",endPoint);
292             endPoint.close();
293         }
294     }
295 
296     protected void execute(Runnable task)
297     {
298         try
299         {
300             getExecutor().execute(task);
301         }
302         catch (RejectedExecutionException e)
303         {
304             if (LOG.isDebugEnabled())
305                 LOG.debug("Job not dispatched: {}",task);
306         }
307     }
308 
309     @Override
310     public void fillInterested()
311     {
312         stats.countFillInterestedEvents.incrementAndGet();
313         super.fillInterested();
314     }
315 
316     @Override
317     public ByteBufferPool getBufferPool()
318     {
319         return bufferPool;
320     }
321 
322     /**
323      * Get the list of extensions in use.
324      * <p>
325      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
326      * 
327      * @return the list of negotiated extensions in use.
328      */
329     public List<ExtensionConfig> getExtensions()
330     {
331         return extensions;
332     }
333 
334     public Generator getGenerator()
335     {
336         return generator;
337     }
338 
339     @Override
340     public long getIdleTimeout()
341     {
342         return getEndPoint().getIdleTimeout();
343     }
344 
345     @Override
346     public IOState getIOState()
347     {
348         return ioState;
349     }
350 
351     @Override
352     public long getMaxIdleTimeout()
353     {
354         return getEndPoint().getIdleTimeout();
355     }
356 
357     public Parser getParser()
358     {
359         return parser;
360     }
361 
362     @Override
363     public WebSocketPolicy getPolicy()
364     {
365         return this.policy;
366     }
367 
368     @Override
369     public InetSocketAddress getRemoteAddress()
370     {
371         return getEndPoint().getRemoteAddress();
372     }
373 
374     public Scheduler getScheduler()
375     {
376         return scheduler;
377     }
378 
379     @Override
380     public WebSocketSession getSession()
381     {
382         return session;
383     }
384 
385     public Stats getStats()
386     {
387         return stats;
388     }
389 
390     @Override
391     public boolean isOpen()
392     {
393         return getIOState().isOpen() && getEndPoint().isOpen();
394     }
395 
396     @Override
397     public boolean isReading()
398     {
399         return isFilling;
400     }
401 
402     /**
403      * Physical connection disconnect.
404      * <p>
405      * Not related to WebSocket close handshake.
406      */
407     @Override
408     public void onClose()
409     {
410         if (LOG.isDebugEnabled())
411             LOG.debug("{} onClose()",policy.getBehavior());
412         super.onClose();
413         // ioState.onDisconnected();
414         flusher.close();
415     }
416 
417     @Override
418     public void onConnectionStateChange(ConnectionState state)
419     {
420         if (LOG.isDebugEnabled())
421             LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
422         switch (state)
423         {
424             case OPEN:
425                 if (LOG.isDebugEnabled())
426                     LOG.debug("fillInterested");
427                 fillInterested();
428                 break;
429             case CLOSED:
430                 if (ioState.wasAbnormalClose())
431                 {
432                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
433                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
434                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
435                 }
436                 else
437                 {
438                     // Just disconnect
439                     this.disconnect(false);
440                 }
441                 break;
442             case CLOSING:
443                 // First occurrence of .onCloseLocal or .onCloseRemote use
444                 if (ioState.wasRemoteCloseInitiated())
445                 {
446                     CloseInfo close = ioState.getCloseInfo();
447                     // reply to close handshake from remote
448                     outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
449                 }
450             default:
451                 break;
452         }
453     }
454 
455     @Override
456     public void onFillable()
457     {
458         if (LOG.isDebugEnabled())
459             LOG.debug("{} onFillable()",policy.getBehavior());
460         stats.countOnFillableEvents.incrementAndGet();
461         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
462         try
463         {
464             isFilling = true;
465 
466             if(readMode == ReadMode.PARSE)
467             {
468                 readMode = readParse(buffer);
469             } else
470             {
471                 readMode = readDiscard(buffer);
472             }
473         }
474         finally
475         {
476             bufferPool.release(buffer);
477         }
478 
479         if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
480         {
481             fillInterested();
482         }
483         else
484         {
485             isFilling = false;
486         }
487     }
488 
489     
490 
491     @Override
492     protected void onFillInterestedFailed(Throwable cause)
493     {
494         LOG.ignore(cause);
495         stats.countFillInterestedEvents.incrementAndGet();
496         super.onFillInterestedFailed(cause);
497     }
498 
499     @Override
500     public void onOpen()
501     {
502         super.onOpen();
503         this.ioState.onOpened();
504     }
505 
506     /**
507      * Event for no activity on connection (read or write)
508      */
509     @Override
510     protected boolean onReadTimeout()
511     {
512         IOState state = getIOState();
513         ConnectionState cstate = state.getConnectionState();
514         if (LOG.isDebugEnabled())
515             LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
516 
517         if (cstate == ConnectionState.CLOSED)
518         {
519             // close already completed, extra timeouts not relevant
520             // allow underlying connection and endpoint to disconnect on its own
521             return true;
522         }
523 
524         try
525         {
526             session.notifyError(new SocketTimeoutException("Timeout on Read"));
527         }
528         finally
529         {
530             // This is an Abnormal Close condition
531             close(StatusCode.SHUTDOWN,"Idle Timeout");
532         }
533 
534         return false;
535     }
536 
537     /**
538      * Frame from API, User, or Internal implementation destined for network.
539      */
540     @Override
541     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
542     {
543         if (LOG.isDebugEnabled())
544         {
545             LOG.debug("outgoingFrame({}, {})",frame,callback);
546         }
547 
548         flusher.enqueue(frame,callback,batchMode);
549     }
550 
551     private ReadMode readDiscard(ByteBuffer buffer)
552     {
553         EndPoint endPoint = getEndPoint();
554         try
555         {
556             while (true)
557             {
558                 int filled = endPoint.fill(buffer);
559                 if (filled == 0)
560                 {
561                     return ReadMode.DISCARD;
562                 }
563                 else if (filled < 0)
564                 {
565                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
566                     return ReadMode.EOF;
567                 }
568                 else
569                 {
570                     if (LOG.isDebugEnabled())
571                     {
572                         LOG.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
573                     }
574                 }
575             }
576         }
577         catch (IOException e)
578         {
579             LOG.ignore(e);
580             return ReadMode.EOF;
581         }
582         catch (Throwable t)
583         {
584             LOG.ignore(t);
585             return ReadMode.DISCARD;
586         }
587     }
588     
589     private ReadMode readParse(ByteBuffer buffer)
590     {
591         EndPoint endPoint = getEndPoint();
592         try
593         {
594             while (true) // TODO: should this honor the LogicalConnection.suspend() ?
595             {
596                 int filled = endPoint.fill(buffer);
597                 if (filled == 0)
598                 {
599                     return ReadMode.PARSE;
600                 }
601                 else if (filled < 0)
602                 {
603                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
604                     ioState.onReadFailure(new EOFException("Remote Read EOF"));
605                     return ReadMode.EOF;
606                 }
607                 else
608                 {
609                     if (LOG.isDebugEnabled())
610                     {
611                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
612                     }
613                     parser.parse(buffer);
614                 }
615             }
616         }
617         catch (IOException e)
618         {
619             LOG.warn(e);
620             close(StatusCode.PROTOCOL,e.getMessage());
621             return ReadMode.DISCARD;
622         }
623         catch (CloseException e)
624         {
625             LOG.debug(e);
626             close(e.getStatusCode(),e.getMessage());
627             return ReadMode.DISCARD;
628         }
629         catch (Throwable t)
630         {
631             LOG.warn(t);
632             close(StatusCode.ABNORMAL,t.getMessage());
633             // TODO: should probably only switch to discard if a non-ws-endpoint error
634             return ReadMode.DISCARD;
635         }
636     }
637 
638     @Override
639     public void resume()
640     {
641         if (suspendToken.getAndSet(false))
642         {
643             fillInterested();
644         }
645     }
646 
647     /**
648      * Get the list of extensions in use.
649      * <p>
650      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
651      * 
652      * @param extensions
653      *            the list of negotiated extensions in use.
654      */
655     public void setExtensions(List<ExtensionConfig> extensions)
656     {
657         this.extensions = extensions;
658     }
659 
660     @Override
661     public void setInputBufferSize(int inputBufferSize)
662     {
663         if (inputBufferSize < MIN_BUFFER_SIZE)
664         {
665             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
666         }
667         super.setInputBufferSize(inputBufferSize);
668     }
669 
670     @Override
671     public void setMaxIdleTimeout(long ms)
672     {
673         getEndPoint().setIdleTimeout(ms);
674     }
675 
676     @Override
677     public void setSession(WebSocketSession session)
678     {
679         this.session = session;
680     }
681 
682     @Override
683     public SuspendToken suspend()
684     {
685         suspendToken.set(true);
686         return this;
687     }
688 
689     @Override
690     public String dump()
691     {
692         return ContainerLifeCycle.dump(this);
693     }
694 
695     @Override
696     public void dump(Appendable out, String indent) throws IOException
697     {
698         out.append(toString()).append(System.lineSeparator());
699     }
700 
701     @Override
702     public String toString()
703     {
704         return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser);
705     }
706 
707 }