View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.Callback;
38  import org.eclipse.jetty.util.ForkInvoker;
39  import org.eclipse.jetty.util.StringUtil;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.CloseException;
44  import org.eclipse.jetty.websocket.api.CloseStatus;
45  import org.eclipse.jetty.websocket.api.StatusCode;
46  import org.eclipse.jetty.websocket.api.SuspendToken;
47  import org.eclipse.jetty.websocket.api.WebSocketException;
48  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49  import org.eclipse.jetty.websocket.api.WriteCallback;
50  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
51  import org.eclipse.jetty.websocket.api.extensions.Frame;
52  import org.eclipse.jetty.websocket.common.CloseInfo;
53  import org.eclipse.jetty.websocket.common.ConnectionState;
54  import org.eclipse.jetty.websocket.common.Generator;
55  import org.eclipse.jetty.websocket.common.LogicalConnection;
56  import org.eclipse.jetty.websocket.common.Parser;
57  import org.eclipse.jetty.websocket.common.WebSocketSession;
58  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
59  
60  /**
61   * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of jetty-io
62   */
63  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
64  {
65      private class FlushCallback implements Callback
66      {
67          /**
68           * The Endpoint.write() failure path
69           */
70          @Override
71          public void failed(Throwable x)
72          {
73              if (ioState.wasAbnormalClose())
74              {
75                  LOG.ignore(x);
76                  return;
77              }
78  
79              LOG.debug("Write flush failure",x);
80  
81              // Unable to write? can't notify other side of close, so disconnect.
82              // This is an ABNORMAL closure
83              String reason = "Websocket write failure";
84  
85              if (x instanceof EOFException)
86              {
87                  reason = "EOF";
88                  Throwable cause = x.getCause();
89                  if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
90                  {
91                      reason = "EOF: " + cause.getMessage();
92                  }
93              }
94              else
95              {
96                  if (StringUtil.isNotBlank(x.getMessage()))
97                  {
98                      reason = x.getMessage();
99                  }
100             }
101 
102             // Abnormal Close
103             reason = CloseStatus.trimMaxReasonLength(reason);
104             session.notifyError(x);
105             session.notifyClose(StatusCode.NO_CLOSE,reason);
106 
107             disconnect(); // disconnect endpoint & connection
108         }
109 
110         @Override
111         public void succeeded()
112         {
113             AbstractWebSocketConnection.this.complete(writeBytes);
114         }
115     }
116 
117     private class FlushInvoker extends ForkInvoker<Callback>
118     {
119         private FlushInvoker()
120         {
121             super(4);
122         }
123 
124         @Override
125         public void call(Callback callback)
126         {
127             flush();
128         }
129 
130         @Override
131         public void fork(final Callback callback)
132         {
133             execute(new Runnable()
134             {
135                 @Override
136                 public void run()
137                 {
138                     flush();
139                 }
140             });
141         }
142 
143         @Override
144         public String toString()
145         {
146             return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
147         }
148     }
149 
150     public class OnDisconnectCallback implements WriteCallback
151     {
152         @Override
153         public void writeFailed(Throwable x)
154         {
155             disconnect();
156         }
157 
158         @Override
159         public void writeSuccess()
160         {
161             disconnect();
162         }
163     }
164 
165     public static class Stats
166     {
167         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
168         private AtomicLong countOnFillableEvents = new AtomicLong(0);
169         private AtomicLong countFillableErrors = new AtomicLong(0);
170 
171         public long getFillableErrorCount()
172         {
173             return countFillableErrors.get();
174         }
175 
176         public long getFillInterestedCount()
177         {
178             return countFillInterestedEvents.get();
179         }
180 
181         public long getOnFillableCount()
182         {
183             return countOnFillableEvents.get();
184         }
185     }
186 
187     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
188 
189     /**
190      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
191      */
192     private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
193 
194     private final ForkInvoker<Callback> invoker = new FlushInvoker();
195     private final ByteBufferPool bufferPool;
196     private final Scheduler scheduler;
197     private final Generator generator;
198     private final Parser parser;
199     private final WebSocketPolicy policy;
200     private final WriteBytesProvider writeBytes;
201     private final AtomicBoolean suspendToken;
202     private WebSocketSession session;
203     private List<ExtensionConfig> extensions;
204     private boolean flushing;
205     private boolean isFilling;
206     private IOState ioState;
207     private Stats stats = new Stats();
208 
209     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
210     {
211         super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
212         this.policy = policy;
213         this.bufferPool = bufferPool;
214         this.generator = new Generator(policy,bufferPool);
215         this.parser = new Parser(policy,bufferPool);
216         this.scheduler = scheduler;
217         this.extensions = new ArrayList<>();
218         this.suspendToken = new AtomicBoolean(false);
219         this.ioState = new IOState();
220         this.ioState.addListener(this);
221         this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
222         this.setInputBufferSize(policy.getInputBufferSize());
223     }
224 
225     @Override
226     public Executor getExecutor()
227     {
228         return super.getExecutor();
229     }
230 
231     @Override
232     public void close()
233     {
234         close(StatusCode.NORMAL,null);
235     }
236 
237     /**
238      * Close the connection.
239      * <p>
240      * This can result in a close handshake over the network, or a simple local abnormal close
241      * 
242      * @param statusCode
243      *            the WebSocket status code.
244      * @param reason
245      *            the (optional) reason string. (null is allowed)
246      * @see StatusCode
247      */
248     @Override
249     public void close(int statusCode, String reason)
250     {
251         CloseInfo close = new CloseInfo(statusCode,reason);
252         if (statusCode == StatusCode.ABNORMAL)
253         {
254             ioState.onAbnormalClose(close);
255         }
256         else
257         {
258             ioState.onCloseLocal(close);
259         }
260     }
261 
262     public void complete(final Callback callback)
263     {
264         LOG.debug("complete({})",callback);
265         synchronized (writeBytes)
266         {
267             flushing = false;
268         }
269 
270         if (!ioState.isOpen() || (callback == null))
271         {
272             return;
273         }
274 
275         invoker.invoke(callback);
276     }
277 
278     @Override
279     public void disconnect()
280     {
281         LOG.debug("{} disconnect()",policy.getBehavior());
282         synchronized (writeBytes)
283         {
284             if (!writeBytes.isClosed())
285             {
286                 writeBytes.close();
287             }
288         }
289         disconnect(false);
290     }
291 
292     private void disconnect(boolean onlyOutput)
293     {
294         EndPoint endPoint = getEndPoint();
295         // We need to gently close first, to allow
296         // SSL close alerts to be sent by Jetty
297         LOG.debug("Shutting down output {}",endPoint);
298         endPoint.shutdownOutput();
299         if (!onlyOutput)
300         {
301             LOG.debug("Closing {}",endPoint);
302             endPoint.close();
303         }
304     }
305 
306     protected void execute(Runnable task)
307     {
308         try
309         {
310             getExecutor().execute(task);
311         }
312         catch (RejectedExecutionException e)
313         {
314             LOG.debug("Job not dispatched: {}",task);
315         }
316     }
317 
318     @Override
319     public void fillInterested()
320     {
321         stats.countFillInterestedEvents.incrementAndGet();
322         super.fillInterested();
323     }
324 
325     public void flush()
326     {
327         List<ByteBuffer> buffers = null;
328 
329         synchronized (writeBytes)
330         {
331             if (flushing)
332             {
333                 LOG.debug("Actively flushing");
334                 return;
335             }
336 
337             if (LOG.isDebugEnabled())
338             {
339                 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
340             }
341 
342             if (!isOpen())
343             {
344                 // No longer have an open connection, drop them all.
345                 writeBytes.failAll(new WebSocketException("Connection closed"));
346                 return;
347             }
348 
349             buffers = writeBytes.getByteBuffers();
350 
351             if ((buffers == null) || (buffers.size() <= 0))
352             {
353                 return;
354             }
355 
356             flushing = true;
357         }
358 
359         write(buffers);
360     }
361 
362     @Override
363     public ByteBufferPool getBufferPool()
364     {
365         return bufferPool;
366     }
367 
368     /**
369      * Get the list of extensions in use.
370      * <p>
371      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
372      * 
373      * @return the list of negotiated extensions in use.
374      */
375     public List<ExtensionConfig> getExtensions()
376     {
377         return extensions;
378     }
379 
380     public Generator getGenerator()
381     {
382         return generator;
383     }
384 
385     @Override
386     public long getIdleTimeout()
387     {
388         return getEndPoint().getIdleTimeout();
389     }
390 
391     @Override
392     public IOState getIOState()
393     {
394         return ioState;
395     }
396 
397     @Override
398     public long getMaxIdleTimeout()
399     {
400         return getEndPoint().getIdleTimeout();
401     }
402 
403     public Parser getParser()
404     {
405         return parser;
406     }
407 
408     @Override
409     public WebSocketPolicy getPolicy()
410     {
411         return this.policy;
412     }
413 
414     @Override
415     public InetSocketAddress getRemoteAddress()
416     {
417         return getEndPoint().getRemoteAddress();
418     }
419 
420     public Scheduler getScheduler()
421     {
422         return scheduler;
423     }
424 
425     @Override
426     public WebSocketSession getSession()
427     {
428         return session;
429     }
430 
431     public Stats getStats()
432     {
433         return stats;
434     }
435 
436     @Override
437     public boolean isOpen()
438     {
439         return getIOState().isOpen() && getEndPoint().isOpen();
440     }
441 
442     @Override
443     public boolean isReading()
444     {
445         return isFilling;
446     }
447 
448     /**
449      * Physical connection disconnect.
450      * <p>
451      * Not related to WebSocket close handshake.
452      */
453     @Override
454     public void onClose()
455     {
456         super.onClose();
457         writeBytes.close();
458     }
459 
460     @Override
461     public void onConnectionStateChange(ConnectionState state)
462     {
463         LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
464         switch (state)
465         {
466             case OPEN:
467                 LOG.debug("fillInterested");
468                 fillInterested();
469                 break;
470             case CLOSED:
471                 if (ioState.wasAbnormalClose())
472                 {
473                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
474                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
475                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback());
476                 }
477                 else
478                 {
479                     // Just disconnect
480                     this.disconnect();
481                 }
482                 break;
483             case CLOSING:
484                 CloseInfo close = ioState.getCloseInfo();
485                 // append close frame
486                 outgoingFrame(close.asFrame(),new OnDisconnectCallback());
487             default:
488                 break;
489         }
490     }
491 
492     @Override
493     public void onFillable()
494     {
495         LOG.debug("{} onFillable()",policy.getBehavior());
496         stats.countOnFillableEvents.incrementAndGet();
497         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
498         BufferUtil.clear(buffer);
499         boolean readMore = false;
500         try
501         {
502             isFilling = true;
503             readMore = (read(buffer) != -1);
504         }
505         finally
506         {
507             bufferPool.release(buffer);
508         }
509 
510         if (readMore && (suspendToken.get() == false))
511         {
512             fillInterested();
513         }
514         else
515         {
516             isFilling = false;
517         }
518     }
519 
520     @Override
521     protected void onFillInterestedFailed(Throwable cause)
522     {
523         LOG.ignore(cause);
524         stats.countFillInterestedEvents.incrementAndGet();
525         super.onFillInterestedFailed(cause);
526     }
527 
528     @Override
529     public void onOpen()
530     {
531         super.onOpen();
532         this.ioState.onOpened();
533     }
534 
535     @Override
536     protected boolean onReadTimeout()
537     {
538         LOG.debug("{} Read Timeout",policy.getBehavior());
539 
540         IOState state = getIOState();
541         if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
542         {
543             // close already initiated, extra timeouts not relevant
544             // allow underlying connection and endpoint to disconnect on its own
545             return true;
546         }
547 
548         // Initiate close - politely send close frame.
549         session.notifyError(new SocketTimeoutException("Timeout on Read"));
550         // This is an Abnormal Close condition
551         close(StatusCode.ABNORMAL,"Idle Timeout");
552 
553         return false;
554     }
555 
556     /**
557      * Frame from API, User, or Internal implementation destined for network.
558      */
559     @Override
560     public void outgoingFrame(Frame frame, WriteCallback callback)
561     {
562         if (LOG.isDebugEnabled())
563         {
564             LOG.debug("outgoingFrame({}, {})",frame,callback);
565         }
566 
567         writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
568 
569         flush();
570     }
571 
572     private int read(ByteBuffer buffer)
573     {
574         EndPoint endPoint = getEndPoint();
575         try
576         {
577             while (true) // TODO: should this honor the LogicalConnection.suspend() ?
578             {
579                 int filled = endPoint.fill(buffer);
580                 if (filled == 0)
581                 {
582                     return 0;
583                 }
584                 else if (filled < 0)
585                 {
586                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
587                     ioState.onReadEOF();
588                     return -1;
589                 }
590                 else
591                 {
592                     if (LOG.isDebugEnabled())
593                     {
594                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
595                     }
596                     parser.parse(buffer);
597                     // TODO: has the end user application already consumed what it was given?
598                 }
599             }
600         }
601         catch (IOException e)
602         {
603             LOG.warn(e);
604             close(StatusCode.PROTOCOL,e.getMessage());
605             return -1;
606         }
607         catch (CloseException e)
608         {
609             LOG.warn(e);
610             close(e.getStatusCode(),e.getMessage());
611             return -1;
612         }
613     }
614 
615     @Override
616     public void resume()
617     {
618         if (suspendToken.getAndSet(false))
619         {
620             fillInterested();
621         }
622     }
623 
624     /**
625      * Get the list of extensions in use.
626      * <p>
627      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
628      * 
629      * @param extensions
630      *            the list of negotiated extensions in use.
631      */
632     public void setExtensions(List<ExtensionConfig> extensions)
633     {
634         this.extensions = extensions;
635     }
636 
637     @Override
638     public void setInputBufferSize(int inputBufferSize)
639     {
640         if (inputBufferSize < MIN_BUFFER_SIZE)
641         {
642             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
643         }
644         super.setInputBufferSize(inputBufferSize);
645     }
646 
647     @Override
648     public void setMaxIdleTimeout(long ms)
649     {
650         getEndPoint().setIdleTimeout(ms);
651     }
652 
653     @Override
654     public void setSession(WebSocketSession session)
655     {
656         this.session = session;
657     }
658 
659     @Override
660     public SuspendToken suspend()
661     {
662         suspendToken.set(true);
663         return this;
664     }
665 
666     @Override
667     public String toString()
668     {
669         return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
670     }
671 
672     private <C> void write(List<ByteBuffer> buffer)
673     {
674         EndPoint endpoint = getEndPoint();
675 
676         try
677         {
678             int bufsize = buffer.size();
679             if (bufsize == 1)
680             {
681                 // simple case
682                 endpoint.write(writeBytes,buffer.get(0));
683             }
684             else
685             {
686                 // gathered writes case
687                 ByteBuffer bbarr[] = buffer.toArray(new ByteBuffer[bufsize]);
688                 endpoint.write(writeBytes,bbarr);
689             }
690         }
691         catch (Throwable t)
692         {
693             writeBytes.failed(t);
694         }
695     }
696 }