1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.ByteBuffer;
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import javax.net.ssl.SSLEngine;
21 import javax.net.ssl.SSLEngineResult;
22 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
23 import javax.net.ssl.SSLException;
24 import javax.net.ssl.SSLSession;
25
26 import org.eclipse.jetty.io.AbstractConnection;
27 import org.eclipse.jetty.io.AsyncEndPoint;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.log.Logger;
33 import org.eclipse.jetty.util.thread.Timeout.Task;
34
35
36
37
38
39
40
41
42
43
44 public class SslConnection extends AbstractConnection implements AsyncConnection
45 {
46 static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio.ssl");
47
48 private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0);
49
50 private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
51 private final SSLEngine _engine;
52 private final SSLSession _session;
53 private AsyncConnection _connection;
54 private final SslEndPoint _sslEndPoint = new SslEndPoint();
55 private int _allocations;
56 private SslBuffers _buffers;
57 private NIOBuffer _inbound;
58 private NIOBuffer _unwrapBuf;
59 private NIOBuffer _outbound;
60 private AsyncEndPoint _aEndp;
61 private boolean _allowRenegotiate=true;
62 private boolean _handshook;
63 private boolean _ishut;
64 private boolean _oshut;
65 private final AtomicBoolean _progressed = new AtomicBoolean();
66
67
68
69
70 private static class SslBuffers
71 {
72 final NIOBuffer _in;
73 final NIOBuffer _out;
74 final NIOBuffer _unwrap;
75
76 SslBuffers(int packetSize, int appSize)
77 {
78 _in=new IndirectNIOBuffer(packetSize);
79 _out=new IndirectNIOBuffer(packetSize);
80 _unwrap=new IndirectNIOBuffer(appSize);
81 }
82 }
83
84
85 public SslConnection(SSLEngine engine,EndPoint endp)
86 {
87 this(engine,endp,System.currentTimeMillis());
88 }
89
90
91 public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp)
92 {
93 super(endp,timeStamp);
94 _engine=engine;
95 _session=_engine.getSession();
96 _aEndp=(AsyncEndPoint)endp;
97 }
98
99
100
101
102
103 public boolean isAllowRenegotiate()
104 {
105 return _allowRenegotiate;
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119 public void setAllowRenegotiate(boolean allowRenegotiate)
120 {
121 _allowRenegotiate = allowRenegotiate;
122 }
123
124
125 private void allocateBuffers()
126 {
127 synchronized (this)
128 {
129 if (_allocations++==0)
130 {
131 if (_buffers==null)
132 {
133 _buffers=__buffers.get();
134 if (_buffers==null)
135 _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
136 _inbound=_buffers._in;
137 _outbound=_buffers._out;
138 _unwrapBuf=_buffers._unwrap;
139 __buffers.set(null);
140 }
141 }
142 }
143 }
144
145
146 private void releaseBuffers()
147 {
148 synchronized (this)
149 {
150 if (--_allocations==0)
151 {
152 if (_buffers!=null &&
153 _inbound.length()==0 &&
154 _outbound.length()==0 &&
155 _unwrapBuf.length()==0)
156 {
157 _inbound=null;
158 _outbound=null;
159 _unwrapBuf=null;
160 __buffers.set(_buffers);
161 _buffers=null;
162 }
163 }
164 }
165 }
166
167
168 public Connection handle() throws IOException
169 {
170 try
171 {
172 allocateBuffers();
173
174 boolean progress=true;
175
176 while (progress)
177 {
178 progress=false;
179
180
181 if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
182 {
183 progress=process(null,null);
184 }
185 else
186 {
187
188 AsyncConnection next = (AsyncConnection)_connection.handle();
189 if (next!=_connection && next!=null)
190 {
191 _connection=next;
192 progress=true;
193 }
194 }
195
196 LOG.debug("{} handle {} progress=",_session,this, progress);
197 }
198 }
199 finally
200 {
201 releaseBuffers();
202
203 if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
204 {
205 _ishut=true;
206 try
207 {
208 _connection.onInputShutdown();
209 }
210 catch(Throwable x)
211 {
212 LOG.warn("onInputShutdown failed", x);
213 try{_sslEndPoint.close();}
214 catch(IOException e2){LOG.ignore(e2);}
215 }
216 }
217 }
218
219 return this;
220 }
221
222
223 public boolean isIdle()
224 {
225 return false;
226 }
227
228
229 public boolean isSuspended()
230 {
231 return false;
232 }
233
234
235 public void onClose()
236 {
237 }
238
239
240 @Override
241 public void onIdleExpired()
242 {
243 try
244 {
245 _sslEndPoint.shutdownOutput();
246 }
247 catch (IOException e)
248 {
249 LOG.warn(e);
250 super.onIdleExpired();
251 }
252 }
253
254
255 public void onInputShutdown() throws IOException
256 {
257
258 }
259
260
261 private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException
262 {
263 boolean some_progress=false;
264 try
265 {
266 allocateBuffers();
267 if (toFill==null)
268 {
269 _unwrapBuf.compact();
270 toFill=_unwrapBuf;
271 }
272 else if (toFill.capacity()<_session.getApplicationBufferSize())
273 {
274 boolean progress=process(null,toFlush);
275 if (_unwrapBuf!=null && _unwrapBuf.hasContent())
276 {
277 _unwrapBuf.skip(toFill.put(_unwrapBuf));
278 return true;
279 }
280 else
281 return progress;
282 }
283 else if (_unwrapBuf!=null && _unwrapBuf.hasContent())
284 {
285 _unwrapBuf.skip(toFill.put(_unwrapBuf));
286 return true;
287 }
288
289
290 if (toFlush==null)
291 toFlush=__ZERO_BUFFER;
292
293 boolean progress=true;
294
295 while (progress)
296 {
297 progress=false;
298 int filled=0,flushed=0;
299
300 try
301 {
302
303 if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0)
304 progress = true;
305
306
307 if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0)
308 progress = true;
309 }
310 catch (Exception e)
311 {
312 LOG.debug(e.toString());
313 LOG.ignore(e);
314 }
315 LOG.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length());
316
317
318 switch(_engine.getHandshakeStatus())
319 {
320 case FINISHED:
321 throw new IllegalStateException();
322
323 case NOT_HANDSHAKING:
324 {
325
326 if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush))
327 progress=true;
328
329
330 if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill))
331 progress=true;
332 }
333 break;
334
335 case NEED_TASK:
336 {
337
338 Runnable task;
339 while ((task=_engine.getDelegatedTask())!=null)
340 {
341 progress=true;
342 task.run();
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 }
360 break;
361
362 case NEED_WRAP:
363 {
364
365 if (_handshook && !_allowRenegotiate)
366 _endp.close();
367 else if (wrap(toFlush))
368 progress=true;
369 }
370 break;
371
372 case NEED_UNWRAP:
373 {
374
375 if (_handshook && !_allowRenegotiate)
376 _endp.close();
377 else if (unwrap(toFill))
378 progress=true;
379 }
380 break;
381 }
382
383
384 if (!_inbound.hasContent() && _endp.isInputShutdown())
385 _engine.closeInbound();
386 if (!_outbound.hasContent() && _engine.isOutboundDone())
387 _endp.shutdownOutput();
388
389 some_progress|=progress;
390 }
391 }
392 finally
393 {
394 releaseBuffers();
395 if (some_progress)
396 _progressed.set(true);
397 }
398 return some_progress;
399 }
400
401 private synchronized boolean wrap(final Buffer buffer) throws IOException
402 {
403 ByteBuffer bbuf=extractByteBuffer(buffer);
404 final SSLEngineResult result;
405
406 synchronized(bbuf)
407 {
408 _outbound.compact();
409 ByteBuffer out_buffer=_outbound.getByteBuffer();
410 synchronized(out_buffer)
411 {
412 try
413 {
414 bbuf.position(buffer.getIndex());
415 bbuf.limit(buffer.putIndex());
416 out_buffer.position(_outbound.putIndex());
417 out_buffer.limit(out_buffer.capacity());
418 result=_engine.wrap(bbuf,out_buffer);
419 if (LOG.isDebugEnabled())
420 LOG.debug("{} wrap {} {} consumed={} produced={}",
421 _session,
422 result.getStatus(),
423 result.getHandshakeStatus(),
424 result.bytesConsumed(),
425 result.bytesProduced());
426
427
428 buffer.skip(result.bytesConsumed());
429 _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced());
430 }
431 catch(SSLException e)
432 {
433 LOG.warn(_endp+":",e);
434 _endp.close();
435 throw e;
436 }
437 finally
438 {
439 out_buffer.position(0);
440 out_buffer.limit(out_buffer.capacity());
441 bbuf.position(0);
442 bbuf.limit(bbuf.capacity());
443 }
444 }
445 }
446
447 switch(result.getStatus())
448 {
449 case BUFFER_UNDERFLOW:
450 throw new IllegalStateException();
451
452 case BUFFER_OVERFLOW:
453 break;
454
455 case OK:
456 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
457 _handshook=true;
458 break;
459
460 case CLOSED:
461 LOG.debug("wrap CLOSE {} {}",this,result);
462 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
463 _endp.close();
464 break;
465
466 default:
467 LOG.warn("{} wrap default {}",_session,result);
468 throw new IOException(result.toString());
469 }
470
471 return result.bytesConsumed()>0 || result.bytesProduced()>0;
472 }
473
474 private synchronized boolean unwrap(final Buffer buffer) throws IOException
475 {
476 if (!_inbound.hasContent())
477 return false;
478
479 ByteBuffer bbuf=extractByteBuffer(buffer);
480 final SSLEngineResult result;
481
482 synchronized(bbuf)
483 {
484 ByteBuffer in_buffer=_inbound.getByteBuffer();
485 synchronized(in_buffer)
486 {
487 try
488 {
489 bbuf.position(buffer.putIndex());
490 bbuf.limit(buffer.capacity());
491 in_buffer.position(_inbound.getIndex());
492 in_buffer.limit(_inbound.putIndex());
493
494 result=_engine.unwrap(in_buffer,bbuf);
495 if (LOG.isDebugEnabled())
496 LOG.debug("{} unwrap {} {} consumed={} produced={}",
497 _session,
498 result.getStatus(),
499 result.getHandshakeStatus(),
500 result.bytesConsumed(),
501 result.bytesProduced());
502
503 _inbound.skip(result.bytesConsumed());
504 _inbound.compact();
505 buffer.setPutIndex(buffer.putIndex()+result.bytesProduced());
506 }
507 catch(SSLException e)
508 {
509 LOG.warn(_endp+":"+e);
510 LOG.debug(e);
511 if (_endp.isOpen())
512 _endp.close();
513 throw e;
514 }
515 finally
516 {
517 in_buffer.position(0);
518 in_buffer.limit(in_buffer.capacity());
519 bbuf.position(0);
520 bbuf.limit(bbuf.capacity());
521 }
522 }
523 }
524
525 switch(result.getStatus())
526 {
527 case BUFFER_UNDERFLOW:
528 break;
529
530 case BUFFER_OVERFLOW:
531 LOG.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString());
532 break;
533
534 case OK:
535 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
536 _handshook=true;
537 break;
538
539 case CLOSED:
540 LOG.debug("unwrap CLOSE {} {}",this,result);
541 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
542 _endp.close();
543 break;
544
545 default:
546 LOG.warn("{} wrap default {}",_session,result);
547 throw new IOException(result.toString());
548 }
549
550
551
552
553 return result.bytesConsumed()>0 || result.bytesProduced()>0;
554 }
555
556
557
558 private ByteBuffer extractByteBuffer(Buffer buffer)
559 {
560 if (buffer.buffer() instanceof NIOBuffer)
561 return ((NIOBuffer)buffer.buffer()).getByteBuffer();
562 return ByteBuffer.wrap(buffer.array());
563 }
564
565
566 public AsyncEndPoint getSslEndPoint()
567 {
568 return _sslEndPoint;
569 }
570
571 public String toString()
572 {
573 Buffer i=_inbound;
574 Buffer o=_outbound;
575 Buffer u=_unwrapBuf;
576
577 return super.toString()+"|"+_engine.getHandshakeStatus()+" i/u/o="+(i==null?0:i.length())+"/"+(u==null?0:u.length())+"/"+(o==null?0:o.length());
578 }
579
580
581
582 public class SslEndPoint implements AsyncEndPoint
583 {
584 public SSLEngine getSslEngine()
585 {
586 return _engine;
587 }
588
589 public void shutdownOutput() throws IOException
590 {
591 synchronized (SslConnection.this)
592 {
593 LOG.debug("{} ssl endp.oshut {}",_session,this);
594 _engine.closeOutbound();
595 _oshut=true;
596 }
597 flush();
598 }
599
600 public boolean isOutputShutdown()
601 {
602 synchronized (SslConnection.this)
603 {
604 return _oshut||!isOpen()||_engine.isOutboundDone();
605 }
606 }
607
608 public void shutdownInput() throws IOException
609 {
610 LOG.debug("{} ssl endp.ishut!",_session);
611
612
613 }
614
615 public boolean isInputShutdown()
616 {
617 synchronized (SslConnection.this)
618 {
619 return _endp.isInputShutdown() &&
620 !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) &&
621 !(_inbound!=null&&_inbound.hasContent());
622 }
623 }
624
625 public void close() throws IOException
626 {
627 LOG.debug("{} ssl endp.close",_session);
628 _endp.close();
629 }
630
631 public int fill(Buffer buffer) throws IOException
632 {
633 int size=buffer.length();
634 process(buffer,null);
635
636 int filled=buffer.length()-size;
637
638 if (filled==0 && isInputShutdown())
639 return -1;
640 return filled;
641 }
642
643 public int flush(Buffer buffer) throws IOException
644 {
645 int size = buffer.length();
646 process(null, buffer);
647 return size-buffer.length();
648 }
649
650 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
651 {
652 if (header!=null && header.hasContent())
653 return flush(header);
654 if (buffer!=null && buffer.hasContent())
655 return flush(buffer);
656 if (trailer!=null && trailer.hasContent())
657 return flush(trailer);
658 return 0;
659 }
660
661 public boolean blockReadable(long millisecs) throws IOException
662 {
663 long now = System.currentTimeMillis();
664 long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;
665
666 while (now<end)
667 {
668 if (process(null,null))
669 break;
670 _endp.blockReadable(end-now);
671 now = System.currentTimeMillis();
672 }
673
674 return now<end;
675 }
676
677 public boolean blockWritable(long millisecs) throws IOException
678 {
679 return _endp.blockWritable(millisecs);
680 }
681
682 public boolean isOpen()
683 {
684 return _endp.isOpen();
685 }
686
687 public Object getTransport()
688 {
689 return _endp;
690 }
691
692 public void flush() throws IOException
693 {
694 process(null,null);
695 }
696
697 public void asyncDispatch()
698 {
699 _aEndp.asyncDispatch();
700 }
701
702 public void scheduleWrite()
703 {
704 _aEndp.scheduleWrite();
705 }
706
707 public void onIdleExpired()
708 {
709 _aEndp.onIdleExpired();
710 }
711
712 public void setCheckForIdle(boolean check)
713 {
714 _aEndp.setCheckForIdle(check);
715 }
716
717 public boolean isCheckForIdle()
718 {
719 return _aEndp.isCheckForIdle();
720 }
721
722 public void scheduleTimeout(Task task, long timeoutMs)
723 {
724 _aEndp.scheduleTimeout(task,timeoutMs);
725 }
726
727 public void cancelTimeout(Task task)
728 {
729 _aEndp.cancelTimeout(task);
730 }
731
732 public boolean isWritable()
733 {
734 return _aEndp.isWritable();
735 }
736
737 public boolean hasProgressed()
738 {
739 return _progressed.getAndSet(false);
740 }
741
742 public String getLocalAddr()
743 {
744 return _aEndp.getLocalAddr();
745 }
746
747 public String getLocalHost()
748 {
749 return _aEndp.getLocalHost();
750 }
751
752 public int getLocalPort()
753 {
754 return _aEndp.getLocalPort();
755 }
756
757 public String getRemoteAddr()
758 {
759 return _aEndp.getRemoteAddr();
760 }
761
762 public String getRemoteHost()
763 {
764 return _aEndp.getRemoteHost();
765 }
766
767 public int getRemotePort()
768 {
769 return _aEndp.getRemotePort();
770 }
771
772 public boolean isBlocking()
773 {
774 return false;
775 }
776
777 public int getMaxIdleTime()
778 {
779 return _aEndp.getMaxIdleTime();
780 }
781
782 public void setMaxIdleTime(int timeMs) throws IOException
783 {
784 _aEndp.setMaxIdleTime(timeMs);
785 }
786
787 public Connection getConnection()
788 {
789 return _connection;
790 }
791
792 public void setConnection(Connection connection)
793 {
794 _connection=(AsyncConnection)connection;
795 }
796
797 public String toString()
798 {
799 int i;
800 int o;
801 int u;
802 synchronized(SslConnection.this)
803 {
804 i=_inbound==null?-1:_inbound.length();
805 o=_outbound==null?-1:_outbound.length();
806 u=_unwrapBuf==null?-1:_unwrapBuf.length();
807 }
808 return String.format("SSL:%s %s i/u/o=%d/%d/%d ishut=%b oshut=%b",
809 _endp,
810 _engine.getHandshakeStatus(),
811 i, u, o,
812 _ishut, _oshut);
813 }
814 }
815 }