1 package org.eclipse.jetty.websocket;
2
3 import java.io.IOException;
4
5 import org.eclipse.jetty.io.AsyncEndPoint;
6 import org.eclipse.jetty.io.Buffer;
7 import org.eclipse.jetty.io.Connection;
8 import org.eclipse.jetty.io.EndPoint;
9 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
10 import org.eclipse.jetty.util.log.Log;
11 import org.eclipse.jetty.util.thread.Timeout;
12
13 public class WebSocketConnection implements Connection, WebSocket.Outbound
14 {
15 final IdleCheck _idle;
16 final EndPoint _endp;
17 final WebSocketParser _parser;
18 final WebSocketGenerator _generator;
19 final long _timestamp;
20 final WebSocket _websocket;
21
22 public WebSocketConnection(WebSocket websocket, EndPoint endpoint)
23 throws IOException
24 {
25 this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000);
26 }
27
28 public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime)
29 throws IOException
30 {
31
32 if (endpoint instanceof AsyncEndPoint)
33 ((AsyncEndPoint)endpoint).cancelIdle();
34
35 _endp = endpoint;
36 _endp.setMaxIdleTime(maxIdleTime);
37
38 _timestamp = timestamp;
39 _websocket = websocket;
40 _generator = new WebSocketGenerator(buffers, _endp);
41 _parser = new WebSocketParser(buffers, endpoint, new WebSocketParser.EventHandler()
42 {
43 public void onFrame(byte frame, String data)
44 {
45 try
46 {
47 _websocket.onMessage(frame,data);
48 }
49 catch(ThreadDeath th)
50 {
51 throw th;
52 }
53 catch(Throwable th)
54 {
55 Log.warn(th);
56 }
57 }
58
59 public void onFrame(byte frame, Buffer buffer)
60 {
61 try
62 {
63 byte[] array=buffer.array();
64
65 _websocket.onMessage(frame,array,buffer.getIndex(),buffer.length());
66 }
67 catch(ThreadDeath th)
68 {
69 throw th;
70 }
71 catch(Throwable th)
72 {
73 Log.warn(th);
74 }
75 }
76 });
77
78
79 if (_endp instanceof SelectChannelEndPoint)
80 {
81 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
82 scep.cancelIdle();
83 _idle=new IdleCheck()
84 {
85 public void access(EndPoint endp)
86 {
87 scep.scheduleIdle();
88 }
89 };
90 scep.scheduleIdle();
91 }
92 else
93 {
94 _idle = new IdleCheck()
95 {
96 public void access(EndPoint endp)
97 {}
98 };
99 }
100 }
101
102 public Connection handle() throws IOException
103 {
104 boolean progress=true;
105
106 try
107 {
108 while (progress)
109 {
110 int flushed=_generator.flush();
111 int filled=_parser.parseNext();
112
113 progress = flushed>0 || filled>0;
114
115 if (filled<0 || flushed<0)
116 {
117 _endp.close();
118 break;
119 }
120 }
121 }
122 catch(IOException e)
123 {
124 e.printStackTrace();
125 throw e;
126 }
127 finally
128 {
129 if (_endp.isOpen())
130 {
131 _idle.access(_endp);
132 checkWriteable();
133 }
134 else
135
136 _websocket.onDisconnect();
137 }
138 return this;
139 }
140
141 public boolean isOpen()
142 {
143 return _endp!=null&&_endp.isOpen();
144 }
145
146 public boolean isIdle()
147 {
148 return _parser.isBufferEmpty() && _generator.isBufferEmpty();
149 }
150
151 public boolean isSuspended()
152 {
153 return false;
154 }
155
156 public long getTimeStamp()
157 {
158 return _timestamp;
159 }
160
161 public void sendMessage(String content) throws IOException
162 {
163 sendMessage(WebSocket.SENTINEL_FRAME,content);
164 }
165
166 public void sendMessage(byte frame, String content) throws IOException
167 {
168 _generator.addFrame(frame,content,_endp.getMaxIdleTime());
169 _generator.flush();
170 checkWriteable();
171 _idle.access(_endp);
172 }
173
174 public void sendMessage(byte frame, byte[] content) throws IOException
175 {
176 sendMessage(frame, content, 0, content.length);
177 }
178
179 public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException
180 {
181 _generator.addFrame(frame,content,offset,length,_endp.getMaxIdleTime());
182 _generator.flush();
183 checkWriteable();
184 _idle.access(_endp);
185 }
186
187 public void disconnect()
188 {
189 try
190 {
191 _generator.flush(_endp.getMaxIdleTime());
192 _endp.close();
193 }
194 catch(IOException e)
195 {
196 Log.ignore(e);
197 }
198 }
199
200 public void fill(Buffer buffer)
201 {
202 _parser.fill(buffer);
203 }
204
205
206 private void checkWriteable()
207 {
208 if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
209 ((AsyncEndPoint)_endp).scheduleWrite();
210 }
211
212 private interface IdleCheck
213 {
214 void access(EndPoint endp);
215 }
216 }