1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.ByteChannel;
22 import java.nio.channels.GatheringByteChannel;
23 import java.nio.channels.SelectableChannel;
24 import java.nio.channels.SocketChannel;
25
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.util.StringUtil;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32
33
34
35
36
37 public class ChannelEndPoint implements EndPoint
38 {
39 private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
40
41 protected final ByteChannel _channel;
42 protected final ByteBuffer[] _gather2=new ByteBuffer[2];
43 protected final Socket _socket;
44 protected final InetSocketAddress _local;
45 protected final InetSocketAddress _remote;
46 protected volatile int _maxIdleTime;
47 private volatile boolean _ishut;
48 private volatile boolean _oshut;
49
50 public ChannelEndPoint(ByteChannel channel) throws IOException
51 {
52 super();
53 this._channel = channel;
54 _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
55 if (_socket!=null)
56 {
57 _local=(InetSocketAddress)_socket.getLocalSocketAddress();
58 _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
59 _maxIdleTime=_socket.getSoTimeout();
60 }
61 else
62 {
63 _local=_remote=null;
64 }
65 }
66
67 protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
68 {
69 this._channel = channel;
70 _maxIdleTime=maxIdleTime;
71 _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
72 if (_socket!=null)
73 {
74 _local=(InetSocketAddress)_socket.getLocalSocketAddress();
75 _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
76 _socket.setSoTimeout(_maxIdleTime);
77 }
78 else
79 {
80 _local=_remote=null;
81 }
82 }
83
84 public boolean isBlocking()
85 {
86 return !(_channel instanceof SelectableChannel) || ((SelectableChannel)_channel).isBlocking();
87 }
88
89 public boolean blockReadable(long millisecs) throws IOException
90 {
91 return true;
92 }
93
94 public boolean blockWritable(long millisecs) throws IOException
95 {
96 return true;
97 }
98
99
100
101
102 public boolean isOpen()
103 {
104 return _channel.isOpen();
105 }
106
107
108
109
110
111 protected final void shutdownChannelInput() throws IOException
112 {
113 LOG.debug("ishut {}", this);
114 _ishut = true;
115 if (_channel.isOpen())
116 {
117 if (_socket != null)
118 {
119 try
120 {
121 if (!_socket.isInputShutdown())
122 {
123 _socket.shutdownInput();
124 }
125 }
126 catch (SocketException e)
127 {
128 LOG.debug(e.toString());
129 LOG.ignore(e);
130 }
131 finally
132 {
133 if (_oshut)
134 {
135 close();
136 }
137 }
138 }
139 }
140 }
141
142
143
144
145 public void shutdownInput() throws IOException
146 {
147 shutdownChannelInput();
148 }
149
150 protected final void shutdownChannelOutput() throws IOException
151 {
152 LOG.debug("oshut {}",this);
153 _oshut = true;
154 if (_channel.isOpen())
155 {
156 if (_socket != null)
157 {
158 try
159 {
160 if (!_socket.isOutputShutdown())
161 {
162 _socket.shutdownOutput();
163 }
164 }
165 catch (SocketException e)
166 {
167 LOG.debug(e.toString());
168 LOG.ignore(e);
169 }
170 finally
171 {
172 if (_ishut)
173 {
174 close();
175 }
176 }
177 }
178 }
179 }
180
181
182
183
184 public void shutdownOutput() throws IOException
185 {
186 shutdownChannelOutput();
187 }
188
189 public boolean isOutputShutdown()
190 {
191 return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown();
192 }
193
194 public boolean isInputShutdown()
195 {
196 return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown();
197 }
198
199
200
201
202 public void close() throws IOException
203 {
204 LOG.debug("close {}",this);
205 _channel.close();
206 }
207
208
209
210
211 public int fill(Buffer buffer) throws IOException
212 {
213 if (_ishut)
214 return -1;
215 Buffer buf = buffer.buffer();
216 int len=0;
217 if (buf instanceof NIOBuffer)
218 {
219 final NIOBuffer nbuf = (NIOBuffer)buf;
220 final ByteBuffer bbuf=nbuf.getByteBuffer();
221
222
223 try
224 {
225 synchronized(bbuf)
226 {
227 try
228 {
229 bbuf.position(buffer.putIndex());
230 len=_channel.read(bbuf);
231 }
232 finally
233 {
234 buffer.setPutIndex(bbuf.position());
235 bbuf.position(0);
236 }
237 }
238
239 if (len<0 && isOpen())
240 {
241 if (!isInputShutdown())
242 shutdownInput();
243 if (isOutputShutdown())
244 _channel.close();
245 }
246 }
247 catch (IOException x)
248 {
249 LOG.debug("Exception while filling", x);
250 try
251 {
252 if (_channel.isOpen())
253 _channel.close();
254 }
255 catch (Exception xx)
256 {
257 LOG.ignore(xx);
258 }
259
260 if (len>0)
261 throw x;
262 len=-1;
263 }
264 }
265 else
266 {
267 throw new IOException("Not Implemented");
268 }
269
270 return len;
271 }
272
273
274
275
276 public int flush(Buffer buffer) throws IOException
277 {
278 Buffer buf = buffer.buffer();
279 int len=0;
280 if (buf instanceof NIOBuffer)
281 {
282 final NIOBuffer nbuf = (NIOBuffer)buf;
283 final ByteBuffer bbuf=nbuf.getByteBuffer();
284
285
286 synchronized(bbuf)
287 {
288 try
289 {
290 bbuf.position(buffer.getIndex());
291 bbuf.limit(buffer.putIndex());
292 len=_channel.write(bbuf);
293 }
294 finally
295 {
296 if (len>0)
297 buffer.skip(len);
298 bbuf.position(0);
299 bbuf.limit(bbuf.capacity());
300 }
301 }
302 }
303 else if (buf instanceof RandomAccessFileBuffer)
304 {
305 len = ((RandomAccessFileBuffer)buf).writeTo(_channel,buffer.getIndex(),buffer.length());
306 if (len>0)
307 buffer.skip(len);
308 }
309 else if (buffer.array()!=null)
310 {
311 ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer.getIndex(), buffer.length());
312 len=_channel.write(b);
313 if (len>0)
314 buffer.skip(len);
315 }
316 else
317 {
318 throw new IOException("Not Implemented");
319 }
320 return len;
321 }
322
323
324
325
326 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
327 {
328 int length=0;
329
330 Buffer buf0 = header==null?null:header.buffer();
331 Buffer buf1 = buffer==null?null:buffer.buffer();
332
333 if (_channel instanceof GatheringByteChannel &&
334 header!=null && header.length()!=0 && buf0 instanceof NIOBuffer &&
335 buffer!=null && buffer.length()!=0 && buf1 instanceof NIOBuffer)
336 {
337 length = gatheringFlush(header,((NIOBuffer)buf0).getByteBuffer(),buffer,((NIOBuffer)buf1).getByteBuffer());
338 }
339 else
340 {
341
342 if (header!=null && header.length()>0)
343 length=flush(header);
344
345
346 if ((header==null || header.length()==0) &&
347 buffer!=null && buffer.length()>0)
348 length+=flush(buffer);
349
350
351 if ((header==null || header.length()==0) &&
352 (buffer==null || buffer.length()==0) &&
353 trailer!=null && trailer.length()>0)
354 length+=flush(trailer);
355 }
356
357 return length;
358 }
359
360 protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException
361 {
362 int length;
363
364 synchronized(this)
365 {
366
367
368 synchronized(bbuf0)
369 {
370
371 synchronized(bbuf1)
372 {
373 try
374 {
375
376 bbuf0.position(header.getIndex());
377 bbuf0.limit(header.putIndex());
378 bbuf1.position(buffer.getIndex());
379 bbuf1.limit(buffer.putIndex());
380
381 _gather2[0]=bbuf0;
382 _gather2[1]=bbuf1;
383
384
385 length=(int)((GatheringByteChannel)_channel).write(_gather2);
386
387 int hl=header.length();
388 if (length>hl)
389 {
390 header.clear();
391 buffer.skip(length-hl);
392 }
393 else if (length>0)
394 {
395 header.skip(length);
396 }
397 }
398 finally
399 {
400
401 if (!header.isImmutable())
402 header.setGetIndex(bbuf0.position());
403 if (!buffer.isImmutable())
404 buffer.setGetIndex(bbuf1.position());
405
406 bbuf0.position(0);
407 bbuf1.position(0);
408 bbuf0.limit(bbuf0.capacity());
409 bbuf1.limit(bbuf1.capacity());
410 }
411 }
412 }
413 }
414 return length;
415 }
416
417
418
419
420
421 public ByteChannel getChannel()
422 {
423 return _channel;
424 }
425
426
427
428
429
430
431 public String getLocalAddr()
432 {
433 if (_socket==null)
434 return null;
435 if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
436 return StringUtil.ALL_INTERFACES;
437 return _local.getAddress().getHostAddress();
438 }
439
440
441
442
443
444 public String getLocalHost()
445 {
446 if (_socket==null)
447 return null;
448 if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
449 return StringUtil.ALL_INTERFACES;
450 return _local.getAddress().getCanonicalHostName();
451 }
452
453
454
455
456
457 public int getLocalPort()
458 {
459 if (_socket==null)
460 return 0;
461 if (_local==null)
462 return -1;
463 return _local.getPort();
464 }
465
466
467
468
469
470 public String getRemoteAddr()
471 {
472 if (_socket==null)
473 return null;
474 if (_remote==null)
475 return null;
476 return _remote.getAddress().getHostAddress();
477 }
478
479
480
481
482
483 public String getRemoteHost()
484 {
485 if (_socket==null)
486 return null;
487 if (_remote==null)
488 return null;
489 return _remote.getAddress().getCanonicalHostName();
490 }
491
492
493
494
495
496 public int getRemotePort()
497 {
498 if (_socket==null)
499 return 0;
500 return _remote==null?-1:_remote.getPort();
501 }
502
503
504
505
506
507 public Object getTransport()
508 {
509 return _channel;
510 }
511
512
513 public void flush()
514 throws IOException
515 {
516 }
517
518
519 public int getMaxIdleTime()
520 {
521 return _maxIdleTime;
522 }
523
524
525
526
527
528 public void setMaxIdleTime(int timeMs) throws IOException
529 {
530 if (_socket!=null && timeMs!=_maxIdleTime)
531 _socket.setSoTimeout(timeMs>0?timeMs:0);
532 _maxIdleTime=timeMs;
533 }
534 }