View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.mux;
20  
21  import java.net.InetSocketAddress;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.Future;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.eclipse.jetty.io.ByteBufferPool;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.StatusCode;
30  import org.eclipse.jetty.websocket.api.SuspendToken;
31  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
32  import org.eclipse.jetty.websocket.api.WriteCallback;
33  import org.eclipse.jetty.websocket.api.extensions.Frame;
34  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
35  import org.eclipse.jetty.websocket.common.CloseInfo;
36  import org.eclipse.jetty.websocket.common.ConnectionState;
37  import org.eclipse.jetty.websocket.common.LogicalConnection;
38  import org.eclipse.jetty.websocket.common.WebSocketFrame;
39  import org.eclipse.jetty.websocket.common.WebSocketSession;
40  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
41  import org.eclipse.jetty.websocket.common.io.IOState;
42  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
43  
44  /**
45   * MuxChannel, acts as WebSocketConnection for specific sub-channel.
46   */
47  public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken, ConnectionStateListener
48  {
49      private static final Logger LOG = Log.getLogger(MuxChannel.class);
50  
51      private final long channelId;
52      private final Muxer muxer;
53      private final AtomicBoolean inputClosed;
54      private final AtomicBoolean outputClosed;
55      private final AtomicBoolean suspendToken;
56      private IOState ioState;
57      private WebSocketPolicy policy;
58      private WebSocketSession session;
59      private IncomingFrames incoming;
60      private String subProtocol;
61  
62      public MuxChannel(long channelId, Muxer muxer)
63      {
64          this.channelId = channelId;
65          this.muxer = muxer;
66          this.policy = muxer.getPolicy().clonePolicy();
67  
68          this.suspendToken = new AtomicBoolean(false);
69          this.ioState = new IOState();
70          this.ioState.addListener(this);
71  
72          this.inputClosed = new AtomicBoolean(false);
73          this.outputClosed = new AtomicBoolean(false);
74      }
75      
76      @Override
77      public Executor getExecutor()
78      {
79          // TODO Auto-generated method stub
80          return null;
81      }
82  
83      @Override
84      public void close()
85      {
86          close(StatusCode.NORMAL,null);
87      }
88  
89      @Override
90      public void close(int statusCode, String reason)
91      {
92          CloseInfo close = new CloseInfo(statusCode,reason);
93          // TODO: disconnect callback?
94          outgoingFrame(close.asFrame(),null);
95      }
96  
97      @Override
98      public void disconnect()
99      {
100         // TODO: disconnect the virtual end-point?
101     }
102 
103     @Override
104     public ByteBufferPool getBufferPool()
105     {
106         // TODO Auto-generated method stub
107         return null;
108     }
109 
110     public long getChannelId()
111     {
112         return channelId;
113     }
114 
115     @Override
116     public long getIdleTimeout()
117     {
118         // TODO Auto-generated method stub
119         return 0;
120     }
121 
122     @Override
123     public IOState getIOState()
124     {
125         // TODO Auto-generated method stub
126         return null;
127     }
128 
129     @Override
130     public InetSocketAddress getLocalAddress()
131     {
132         // TODO Auto-generated method stub
133         return null;
134     }
135 
136     @Override
137     public long getMaxIdleTimeout()
138     {
139         // TODO Auto-generated method stub
140         return 0;
141     }
142 
143     @Override
144     public WebSocketPolicy getPolicy()
145     {
146         return policy;
147     }
148 
149     @Override
150     public InetSocketAddress getRemoteAddress()
151     {
152         return muxer.getRemoteAddress();
153     }
154 
155     @Override
156     public WebSocketSession getSession()
157     {
158         return session;
159     }
160 
161     /**
162      * Incoming exceptions from Muxer.
163      */
164     @Override
165     public void incomingError(Throwable e)
166     {
167         incoming.incomingError(e);
168     }
169 
170     /**
171      * Incoming frames from Muxer
172      */
173     @Override
174     public void incomingFrame(Frame frame)
175     {
176         incoming.incomingFrame(frame);
177     }
178 
179     public boolean isActive()
180     {
181         return (ioState.isOpen());
182     }
183 
184     @Override
185     public boolean isOpen()
186     {
187         return isActive() && muxer.isOpen();
188     }
189 
190     @Override
191     public boolean isReading()
192     {
193         return true;
194     }
195 
196     public void onClose()
197     {
198     }
199 
200     @Override
201     public void onConnectionStateChange(ConnectionState state)
202     {
203         // TODO Auto-generated method stub
204 
205     }
206 
207     public void onOpen()
208     {
209         this.ioState.onOpened();
210     }
211 
212     /**
213      * Internal
214      * 
215      * @param frame the frame to write
216      * @return the future for the network write of the frame
217      */
218     private Future<Void> outgoingAsyncFrame(WebSocketFrame frame)
219     {
220         FutureWriteCallback future = new FutureWriteCallback();
221         outgoingFrame(frame,future);
222         return future;
223     }
224 
225     /**
226      * Frames destined for the Muxer
227      */
228     @Override
229     public void outgoingFrame(Frame frame, WriteCallback callback)
230     {
231         muxer.output(channelId,frame,callback);
232     }
233 
234     @Override
235     public void resume()
236     {
237         if (suspendToken.getAndSet(false))
238         {
239             // TODO: Start reading again. (how?)
240         }
241     }
242 
243     @Override
244     public void setMaxIdleTimeout(long ms)
245     {
246         // TODO Auto-generated method stub
247 
248     }
249 
250     @Override
251     public void setNextIncomingFrames(IncomingFrames incoming)
252     {
253         this.incoming = incoming;
254     }
255 
256     @Override
257     public void setSession(WebSocketSession session)
258     {
259         this.session = session;
260         // session.setOutgoing(this);
261     }
262 
263     public void setSubProtocol(String subProtocol)
264     {
265         this.subProtocol = subProtocol;
266     }
267 
268     @Override
269     public SuspendToken suspend()
270     {
271         suspendToken.set(true);
272         // TODO: how to suspend reading?
273         return this;
274     }
275 }