View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions.mux;
20  
21  import java.net.InetSocketAddress;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.eclipse.jetty.util.log.Log;
26  import org.eclipse.jetty.util.log.Logger;
27  import org.eclipse.jetty.websocket.api.StatusCode;
28  import org.eclipse.jetty.websocket.api.SuspendToken;
29  import org.eclipse.jetty.websocket.api.WebSocketException;
30  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31  import org.eclipse.jetty.websocket.api.WriteCallback;
32  import org.eclipse.jetty.websocket.api.extensions.Frame;
33  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
34  import org.eclipse.jetty.websocket.common.CloseInfo;
35  import org.eclipse.jetty.websocket.common.ConnectionState;
36  import org.eclipse.jetty.websocket.common.LogicalConnection;
37  import org.eclipse.jetty.websocket.common.WebSocketFrame;
38  import org.eclipse.jetty.websocket.common.WebSocketSession;
39  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
40  import org.eclipse.jetty.websocket.common.io.IOState;
41  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
42  
43  /**
44   * MuxChannel, acts as WebSocketConnection for specific sub-channel.
45   */
46  public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken, ConnectionStateListener
47  {
48      private static final Logger LOG = Log.getLogger(MuxChannel.class);
49  
50      private final long channelId;
51      private final Muxer muxer;
52      private final AtomicBoolean inputClosed;
53      private final AtomicBoolean outputClosed;
54      private final AtomicBoolean suspendToken;
55      private IOState ioState;
56      private WebSocketPolicy policy;
57      private WebSocketSession session;
58      private IncomingFrames incoming;
59      private String subProtocol;
60  
61      public MuxChannel(long channelId, Muxer muxer)
62      {
63          this.channelId = channelId;
64          this.muxer = muxer;
65          this.policy = muxer.getPolicy().clonePolicy();
66  
67          this.suspendToken = new AtomicBoolean(false);
68          this.ioState = new IOState();
69          this.ioState.addListener(this);
70  
71          this.inputClosed = new AtomicBoolean(false);
72          this.outputClosed = new AtomicBoolean(false);
73      }
74  
75      @Override
76      public void close()
77      {
78          close(StatusCode.NORMAL,null);
79      }
80  
81      @Override
82      public void close(int statusCode, String reason)
83      {
84          CloseInfo close = new CloseInfo(statusCode,reason);
85          // TODO: disconnect callback?
86          outgoingFrame(close.asFrame(),null);
87      }
88  
89      @Override
90      public void disconnect()
91      {
92          // TODO: disconnect the virtual end-point?
93      }
94  
95      public long getChannelId()
96      {
97          return channelId;
98      }
99  
100     @Override
101     public IOState getIOState()
102     {
103         // TODO Auto-generated method stub
104         return null;
105     }
106 
107     @Override
108     public InetSocketAddress getLocalAddress()
109     {
110         // TODO Auto-generated method stub
111         return null;
112     }
113 
114     @Override
115     public long getMaxIdleTimeout()
116     {
117         // TODO Auto-generated method stub
118         return 0;
119     }
120 
121     @Override
122     public WebSocketPolicy getPolicy()
123     {
124         return policy;
125     }
126 
127     @Override
128     public InetSocketAddress getRemoteAddress()
129     {
130         return muxer.getRemoteAddress();
131     }
132 
133     @Override
134     public WebSocketSession getSession()
135     {
136         return session;
137     }
138 
139     /**
140      * Incoming exceptions from Muxer.
141      */
142     @Override
143     public void incomingError(WebSocketException e)
144     {
145         incoming.incomingError(e);
146     }
147 
148     /**
149      * Incoming frames from Muxer
150      */
151     @Override
152     public void incomingFrame(Frame frame)
153     {
154         incoming.incomingFrame(frame);
155     }
156 
157     public boolean isActive()
158     {
159         return (ioState.isOpen());
160     }
161 
162     @Override
163     public boolean isOpen()
164     {
165         return isActive() && muxer.isOpen();
166     }
167 
168     @Override
169     public boolean isReading()
170     {
171         return true;
172     }
173 
174     public void onClose()
175     {
176     }
177 
178     @Override
179     public void onConnectionStateChange(ConnectionState state)
180     {
181         // TODO Auto-generated method stub
182 
183     }
184 
185     public void onOpen()
186     {
187         this.ioState.onOpened();
188     }
189 
190     /**
191      * Internal
192      * 
193      * @param frame the frame to write
194      * @return the future for the network write of the frame
195      */
196     private Future<Void> outgoingAsyncFrame(WebSocketFrame frame)
197     {
198         FutureWriteCallback future = new FutureWriteCallback();
199         outgoingFrame(frame,future);
200         return future;
201     }
202 
203     /**
204      * Frames destined for the Muxer
205      */
206     @Override
207     public void outgoingFrame(Frame frame, WriteCallback callback)
208     {
209         muxer.output(channelId,frame,callback);
210     }
211 
212     @Override
213     public void resume()
214     {
215         if (suspendToken.getAndSet(false))
216         {
217             // TODO: Start reading again. (how?)
218         }
219     }
220 
221     @Override
222     public void setMaxIdleTimeout(long ms)
223     {
224         // TODO Auto-generated method stub
225 
226     }
227 
228     @Override
229     public void setNextIncomingFrames(IncomingFrames incoming)
230     {
231         this.incoming = incoming;
232     }
233 
234     @Override
235     public void setSession(WebSocketSession session)
236     {
237         this.session = session;
238         // session.setOutgoing(this);
239     }
240 
241     public void setSubProtocol(String subProtocol)
242     {
243         this.subProtocol = subProtocol;
244     }
245 
246     @Override
247     public SuspendToken suspend()
248     {
249         suspendToken.set(true);
250         // TODO: how to suspend reading?
251         return this;
252     }
253 }