View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.StringUtil;
39  import org.eclipse.jetty.util.component.ContainerLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.Scheduler;
44  import org.eclipse.jetty.websocket.api.BatchMode;
45  import org.eclipse.jetty.websocket.api.CloseException;
46  import org.eclipse.jetty.websocket.api.CloseStatus;
47  import org.eclipse.jetty.websocket.api.StatusCode;
48  import org.eclipse.jetty.websocket.api.SuspendToken;
49  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
50  import org.eclipse.jetty.websocket.api.WriteCallback;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
52  import org.eclipse.jetty.websocket.api.extensions.Frame;
53  import org.eclipse.jetty.websocket.common.CloseInfo;
54  import org.eclipse.jetty.websocket.common.ConnectionState;
55  import org.eclipse.jetty.websocket.common.Generator;
56  import org.eclipse.jetty.websocket.common.LogicalConnection;
57  import org.eclipse.jetty.websocket.common.Parser;
58  import org.eclipse.jetty.websocket.common.WebSocketSession;
59  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
60  
61  /**
62   * Provides the implementation of {@link LogicalConnection} within the
63   * framework of the new {@link Connection} framework of {@code jetty-io}.
64   */
65  public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
66  {
67      private class Flusher extends FrameFlusher
68      {
69          private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
70          {
71              super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
72          }
73  
74          @Override
75          protected void onFailure(Throwable x)
76          {
77              if (ioState.wasAbnormalClose())
78              {
79                  LOG.ignore(x);
80                  return;
81              }
82  
83              LOG.debug("Write flush failure",x);
84  
85              // Unable to write? can't notify other side of close, so disconnect.
86              // This is an ABNORMAL closure
87              String reason = "Websocket write failure";
88  
89              if (x instanceof EOFException)
90              {
91                  reason = "EOF";
92                  Throwable cause = x.getCause();
93                  if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
94                  {
95                      reason = "EOF: " + cause.getMessage();
96                  }
97              }
98              else
99              {
100                 if (StringUtil.isNotBlank(x.getMessage()))
101                 {
102                     reason = x.getMessage();
103                 }
104             }
105 
106             // Abnormal Close
107             reason = CloseStatus.trimMaxReasonLength(reason);
108             session.notifyError(x);
109             session.notifyClose(StatusCode.NO_CLOSE,reason);
110 
111             disconnect(); // disconnect endpoint & connection
112         }
113     }
114 
115     public class OnDisconnectCallback implements WriteCallback
116     {
117         @Override
118         public void writeFailed(Throwable x)
119         {
120             disconnect();
121         }
122 
123         @Override
124         public void writeSuccess()
125         {
126             disconnect();
127         }
128     }
129 
130     public static class Stats
131     {
132         private AtomicLong countFillInterestedEvents = new AtomicLong(0);
133         private AtomicLong countOnFillableEvents = new AtomicLong(0);
134         private AtomicLong countFillableErrors = new AtomicLong(0);
135 
136         public long getFillableErrorCount()
137         {
138             return countFillableErrors.get();
139         }
140 
141         public long getFillInterestedCount()
142         {
143             return countFillInterestedEvents.get();
144         }
145 
146         public long getOnFillableCount()
147         {
148             return countOnFillableEvents.get();
149         }
150     }
151 
152     private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
153 
154     /**
155      * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
156      */
157     private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
158 
159     private final ByteBufferPool bufferPool;
160     private final Scheduler scheduler;
161     private final Generator generator;
162     private final Parser parser;
163     private final WebSocketPolicy policy;
164     private final AtomicBoolean suspendToken;
165     private final FrameFlusher flusher;
166     private WebSocketSession session;
167     private List<ExtensionConfig> extensions;
168     private boolean isFilling;
169     private IOState ioState;
170     private Stats stats = new Stats();
171 
172     public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
173     {
174         super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
175         this.policy = policy;
176         this.bufferPool = bufferPool;
177         this.generator = new Generator(policy,bufferPool);
178         this.parser = new Parser(policy,bufferPool);
179         this.scheduler = scheduler;
180         this.extensions = new ArrayList<>();
181         this.suspendToken = new AtomicBoolean(false);
182         this.ioState = new IOState();
183         this.ioState.addListener(this);
184         this.flusher = new Flusher(bufferPool,generator,endp);
185         this.setInputBufferSize(policy.getInputBufferSize());
186         this.setMaxIdleTimeout(policy.getIdleTimeout());
187     }
188 
189     @Override
190     public Executor getExecutor()
191     {
192         return super.getExecutor();
193     }
194 
195     @Override
196     public void close()
197     {
198         close(StatusCode.NORMAL,null);
199     }
200 
201     /**
202      * Close the connection.
203      * <p>
204      * This can result in a close handshake over the network, or a simple local abnormal close
205      * 
206      * @param statusCode
207      *            the WebSocket status code.
208      * @param reason
209      *            the (optional) reason string. (null is allowed)
210      * @see StatusCode
211      */
212     @Override
213     public void close(int statusCode, String reason)
214     {
215         CloseInfo close = new CloseInfo(statusCode,reason);
216         if (statusCode == StatusCode.ABNORMAL)
217         {
218             flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way
219             ioState.onAbnormalClose(close);
220         }
221         else
222         {
223             ioState.onCloseLocal(close);
224         }
225     }
226 
227 
228     @Override
229     public void disconnect()
230     {
231         LOG.debug("{} disconnect()",policy.getBehavior());
232         flusher.close();
233         disconnect(false);
234     }
235 
236     private void disconnect(boolean onlyOutput)
237     {
238         EndPoint endPoint = getEndPoint();
239         // We need to gently close first, to allow
240         // SSL close alerts to be sent by Jetty
241         LOG.debug("Shutting down output {}",endPoint);
242         endPoint.shutdownOutput();
243         if (!onlyOutput)
244         {
245             LOG.debug("Closing {}",endPoint);
246             endPoint.close();
247         }
248     }
249 
250     protected void execute(Runnable task)
251     {
252         try
253         {
254             getExecutor().execute(task);
255         }
256         catch (RejectedExecutionException e)
257         {
258             LOG.debug("Job not dispatched: {}",task);
259         }
260     }
261 
262     @Override
263     public void fillInterested()
264     {
265         stats.countFillInterestedEvents.incrementAndGet();
266         super.fillInterested();
267     }
268 
269     @Override
270     public ByteBufferPool getBufferPool()
271     {
272         return bufferPool;
273     }
274 
275     /**
276      * Get the list of extensions in use.
277      * <p>
278      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
279      * 
280      * @return the list of negotiated extensions in use.
281      */
282     public List<ExtensionConfig> getExtensions()
283     {
284         return extensions;
285     }
286 
287     public Generator getGenerator()
288     {
289         return generator;
290     }
291 
292     @Override
293     public long getIdleTimeout()
294     {
295         return getEndPoint().getIdleTimeout();
296     }
297 
298     @Override
299     public IOState getIOState()
300     {
301         return ioState;
302     }
303 
304     @Override
305     public long getMaxIdleTimeout()
306     {
307         return getEndPoint().getIdleTimeout();
308     }
309 
310     public Parser getParser()
311     {
312         return parser;
313     }
314 
315     @Override
316     public WebSocketPolicy getPolicy()
317     {
318         return this.policy;
319     }
320 
321     @Override
322     public InetSocketAddress getRemoteAddress()
323     {
324         return getEndPoint().getRemoteAddress();
325     }
326 
327     public Scheduler getScheduler()
328     {
329         return scheduler;
330     }
331 
332     @Override
333     public WebSocketSession getSession()
334     {
335         return session;
336     }
337 
338     public Stats getStats()
339     {
340         return stats;
341     }
342 
343     @Override
344     public boolean isOpen()
345     {
346         return getIOState().isOpen() && getEndPoint().isOpen();
347     }
348 
349     @Override
350     public boolean isReading()
351     {
352         return isFilling;
353     }
354 
355     /**
356      * Physical connection disconnect.
357      * <p>
358      * Not related to WebSocket close handshake.
359      */
360     @Override
361     public void onClose()
362     {
363         super.onClose();
364         flusher.close();
365     }
366 
367     @Override
368     public void onConnectionStateChange(ConnectionState state)
369     {
370         LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
371         switch (state)
372         {
373             case OPEN:
374                 LOG.debug("fillInterested");
375                 fillInterested();
376                 break;
377             case CLOSED:
378                 if (ioState.wasAbnormalClose())
379                 {
380                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
381                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
382                     outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
383                 }
384                 else
385                 {
386                     // Just disconnect
387                     this.disconnect();
388                 }
389                 break;
390             case CLOSING:
391                 CloseInfo close = ioState.getCloseInfo();
392                 // append close frame
393                 outgoingFrame(close.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
394             default:
395                 break;
396         }
397     }
398 
399     @Override
400     public void onFillable()
401     {
402         LOG.debug("{} onFillable()",policy.getBehavior());
403         stats.countOnFillableEvents.incrementAndGet();
404         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
405         boolean readMore = false;
406         try
407         {
408             isFilling = true;
409             readMore = (read(buffer) != -1);
410         }
411         finally
412         {
413             bufferPool.release(buffer);
414         }
415 
416         if (readMore && (suspendToken.get() == false))
417         {
418             fillInterested();
419         }
420         else
421         {
422             isFilling = false;
423         }
424     }
425 
426     @Override
427     protected void onFillInterestedFailed(Throwable cause)
428     {
429         LOG.ignore(cause);
430         stats.countFillInterestedEvents.incrementAndGet();
431         super.onFillInterestedFailed(cause);
432     }
433 
434     @Override
435     public void onOpen()
436     {
437         super.onOpen();
438         this.ioState.onOpened();
439     }
440 
441     @Override
442     protected boolean onReadTimeout()
443     {
444         LOG.debug("{} Read Timeout",policy.getBehavior());
445 
446         IOState state = getIOState();
447         if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
448         {
449             // close already initiated, extra timeouts not relevant
450             // allow underlying connection and endpoint to disconnect on its own
451             return true;
452         }
453 
454         // Initiate close - politely send close frame.
455         session.notifyError(new SocketTimeoutException("Timeout on Read"));
456         // This is an Abnormal Close condition
457         close(StatusCode.ABNORMAL,"Idle Timeout");
458 
459         return false;
460     }
461 
462     /**
463      * Frame from API, User, or Internal implementation destined for network.
464      */
465     @Override
466     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
467     {
468         if (LOG.isDebugEnabled())
469         {
470             LOG.debug("outgoingFrame({}, {})",frame,callback);
471         }
472 
473         flusher.enqueue(frame,callback, batchMode);
474     }
475 
476     private int read(ByteBuffer buffer)
477     {
478         EndPoint endPoint = getEndPoint();
479         try
480         {
481             while (true) // TODO: should this honor the LogicalConnection.suspend() ?
482             {
483                 int filled = endPoint.fill(buffer);
484                 if (filled == 0)
485                 {
486                     return 0;
487                 }
488                 else if (filled < 0)
489                 {
490                     LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
491                     ioState.onReadEOF();
492                     return -1;
493                 }
494                 else
495                 {
496                     if (LOG.isDebugEnabled())
497                     {
498                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
499                     }
500                     parser.parse(buffer);
501                     // TODO: has the end user application already consumed what it was given?
502                 }
503             }
504         }
505         catch (IOException e)
506         {
507             LOG.warn(e);
508             close(StatusCode.PROTOCOL,e.getMessage());
509             return -1;
510         }
511         catch (CloseException e)
512         {
513             LOG.warn(e);
514             close(e.getStatusCode(),e.getMessage());
515             return -1;
516         }
517     }
518 
519     @Override
520     public void resume()
521     {
522         if (suspendToken.getAndSet(false))
523         {
524             fillInterested();
525         }
526     }
527 
528     /**
529      * Get the list of extensions in use.
530      * <p>
531      * This list is negotiated during the WebSocket Upgrade Request/Response handshake.
532      * 
533      * @param extensions
534      *            the list of negotiated extensions in use.
535      */
536     public void setExtensions(List<ExtensionConfig> extensions)
537     {
538         this.extensions = extensions;
539     }
540 
541     @Override
542     public void setInputBufferSize(int inputBufferSize)
543     {
544         if (inputBufferSize < MIN_BUFFER_SIZE)
545         {
546             throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
547         }
548         super.setInputBufferSize(inputBufferSize);
549     }
550 
551     @Override
552     public void setMaxIdleTimeout(long ms)
553     {
554         getEndPoint().setIdleTimeout(ms);
555     }
556 
557     @Override
558     public void setSession(WebSocketSession session)
559     {
560         this.session = session;
561     }
562 
563     @Override
564     public SuspendToken suspend()
565     {
566         suspendToken.set(true);
567         return this;
568     }
569 
570     @Override
571     public String dump()
572     {
573         return ContainerLifeCycle.dump(this);
574     }
575 
576     @Override
577     public void dump(Appendable out, String indent) throws IOException
578     {
579         out.append(toString()).append(System.lineSeparator());
580     }
581 
582     @Override
583     public String toString()
584     {
585         return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser);
586     }
587 
588 }