View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions.mux;
20  
21  import java.net.InetSocketAddress;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.eclipse.jetty.util.log.Log;
26  import org.eclipse.jetty.util.log.Logger;
27  import org.eclipse.jetty.websocket.api.StatusCode;
28  import org.eclipse.jetty.websocket.api.SuspendToken;
29  import org.eclipse.jetty.websocket.api.WebSocketException;
30  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31  import org.eclipse.jetty.websocket.api.WriteCallback;
32  import org.eclipse.jetty.websocket.api.extensions.Frame;
33  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
34  import org.eclipse.jetty.websocket.common.CloseInfo;
35  import org.eclipse.jetty.websocket.common.ConnectionState;
36  import org.eclipse.jetty.websocket.common.LogicalConnection;
37  import org.eclipse.jetty.websocket.common.WebSocketFrame;
38  import org.eclipse.jetty.websocket.common.WebSocketSession;
39  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
40  import org.eclipse.jetty.websocket.common.io.IOState;
41  
42  /**
43   * MuxChannel, acts as WebSocketConnection for specific sub-channel.
44   */
45  public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken
46  {
47      private static final Logger LOG = Log.getLogger(MuxChannel.class);
48  
49      private final long channelId;
50      private final Muxer muxer;
51      private final AtomicBoolean inputClosed;
52      private final AtomicBoolean outputClosed;
53      private final AtomicBoolean suspendToken;
54      private IOState ioState;
55      private WebSocketPolicy policy;
56      private WebSocketSession session;
57      private IncomingFrames incoming;
58      private String subProtocol;
59  
60      public MuxChannel(long channelId, Muxer muxer)
61      {
62          this.channelId = channelId;
63          this.muxer = muxer;
64          this.policy = muxer.getPolicy().clonePolicy();
65  
66          this.suspendToken = new AtomicBoolean(false);
67          this.ioState = new IOState();
68          ioState.setState(ConnectionState.CONNECTING);
69  
70          this.inputClosed = new AtomicBoolean(false);
71          this.outputClosed = new AtomicBoolean(false);
72      }
73  
74      @Override
75      public void close()
76      {
77          close(StatusCode.NORMAL,null);
78      }
79  
80      @Override
81      public void close(int statusCode, String reason)
82      {
83          CloseInfo close = new CloseInfo(statusCode,reason);
84          // TODO: disconnect callback?
85          outgoingFrame(close.asFrame(),null);
86      }
87  
88      @Override
89      public void disconnect()
90      {
91          this.ioState.setState(ConnectionState.CLOSED);
92          // TODO: disconnect the virtual end-point?
93      }
94  
95      public long getChannelId()
96      {
97          return channelId;
98      }
99  
100     @Override
101     public IOState getIOState()
102     {
103         // TODO Auto-generated method stub
104         return null;
105     }
106 
107     @Override
108     public InetSocketAddress getLocalAddress()
109     {
110         // TODO Auto-generated method stub
111         return null;
112     }
113 
114     @Override
115     public WebSocketPolicy getPolicy()
116     {
117         return policy;
118     }
119 
120     @Override
121     public InetSocketAddress getRemoteAddress()
122     {
123         return muxer.getRemoteAddress();
124     }
125 
126     @Override
127     public WebSocketSession getSession()
128     {
129         return session;
130     }
131 
132     /**
133      * Incoming exceptions from Muxer.
134      */
135     @Override
136     public void incomingError(WebSocketException e)
137     {
138         incoming.incomingError(e);
139     }
140 
141     /**
142      * Incoming frames from Muxer
143      */
144     @Override
145     public void incomingFrame(Frame frame)
146     {
147         incoming.incomingFrame(frame);
148     }
149 
150     public boolean isActive()
151     {
152         return (ioState.isOpen());
153     }
154 
155     @Override
156     public boolean isOpen()
157     {
158         return isActive() && muxer.isOpen();
159     }
160 
161     @Override
162     public boolean isReading()
163     {
164         return true;
165     }
166 
167     public void onClose()
168     {
169         this.ioState.setState(ConnectionState.CLOSED);
170     }
171 
172     public void onOpen()
173     {
174         this.ioState.setState(ConnectionState.OPEN);
175     }
176 
177     /**
178      * Internal
179      * 
180      * @param frame the frame to write
181      * @return the future for the network write of the frame
182      */
183     private Future<Void> outgoingAsyncFrame(WebSocketFrame frame)
184     {
185         FutureWriteCallback future = new FutureWriteCallback();
186         outgoingFrame(frame,future);
187         return future;
188     }
189 
190     /**
191      * Frames destined for the Muxer
192      */
193     @Override
194     public void outgoingFrame(Frame frame, WriteCallback callback)
195     {
196         muxer.output(channelId,frame,callback);
197     }
198 
199     @Override
200     public void resume()
201     {
202         if (suspendToken.getAndSet(false))
203         {
204             // TODO: Start reading again. (how?)
205         }
206     }
207 
208     @Override
209     public void setNextIncomingFrames(IncomingFrames incoming)
210     {
211         this.incoming = incoming;
212     }
213 
214     @Override
215     public void setSession(WebSocketSession session)
216     {
217         this.session = session;
218         // session.setOutgoing(this);
219     }
220 
221     public void setSubProtocol(String subProtocol)
222     {
223         this.subProtocol = subProtocol;
224     }
225 
226     @Override
227     public SuspendToken suspend()
228     {
229         suspendToken.set(true);
230         // TODO: how to suspend reading?
231         return this;
232     }
233 }