1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io.nio;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import javax.net.ssl.SSLEngine;
25 import javax.net.ssl.SSLEngineResult;
26 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
27 import javax.net.ssl.SSLException;
28 import javax.net.ssl.SSLSession;
29
30 import org.eclipse.jetty.io.AbstractConnection;
31 import org.eclipse.jetty.io.AsyncEndPoint;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.Timeout.Task;
38
39
40
41
42
43
44
45
46
47
48 public class SslConnection extends AbstractConnection implements AsyncConnection
49 {
50 private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
51
52 private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0);
53
54 private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
55 private final SSLEngine _engine;
56 private final SSLSession _session;
57 private AsyncConnection _connection;
58 private final SslEndPoint _sslEndPoint;
59 private int _allocations;
60 private SslBuffers _buffers;
61 private NIOBuffer _inbound;
62 private NIOBuffer _unwrapBuf;
63 private NIOBuffer _outbound;
64 private AsyncEndPoint _aEndp;
65 private boolean _allowRenegotiate=true;
66 private boolean _handshook;
67 private boolean _ishut;
68 private boolean _oshut;
69 private final AtomicBoolean _progressed = new AtomicBoolean();
70
71
72
73
74 private static class SslBuffers
75 {
76 final NIOBuffer _in;
77 final NIOBuffer _out;
78 final NIOBuffer _unwrap;
79
80 SslBuffers(int packetSize, int appSize)
81 {
82 _in=new IndirectNIOBuffer(packetSize);
83 _out=new IndirectNIOBuffer(packetSize);
84 _unwrap=new IndirectNIOBuffer(appSize);
85 }
86 }
87
88
89 public SslConnection(SSLEngine engine,EndPoint endp)
90 {
91 this(engine,endp,System.currentTimeMillis());
92 }
93
94
95 public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp)
96 {
97 super(endp,timeStamp);
98 _engine=engine;
99 _session=_engine.getSession();
100 _aEndp=(AsyncEndPoint)endp;
101 _sslEndPoint = newSslEndPoint();
102 }
103
104
105 protected SslEndPoint newSslEndPoint()
106 {
107 return new SslEndPoint();
108 }
109
110
111
112
113
114 public boolean isAllowRenegotiate()
115 {
116 return _allowRenegotiate;
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130 public void setAllowRenegotiate(boolean allowRenegotiate)
131 {
132 _allowRenegotiate = allowRenegotiate;
133 }
134
135
136 private void allocateBuffers()
137 {
138 synchronized (this)
139 {
140 if (_allocations++==0)
141 {
142 if (_buffers==null)
143 {
144 _buffers=__buffers.get();
145 if (_buffers==null)
146 _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
147 _inbound=_buffers._in;
148 _outbound=_buffers._out;
149 _unwrapBuf=_buffers._unwrap;
150 __buffers.set(null);
151 }
152 }
153 }
154 }
155
156
157 private void releaseBuffers()
158 {
159 synchronized (this)
160 {
161 if (--_allocations==0)
162 {
163 if (_buffers!=null &&
164 _inbound.length()==0 &&
165 _outbound.length()==0 &&
166 _unwrapBuf.length()==0)
167 {
168 _inbound=null;
169 _outbound=null;
170 _unwrapBuf=null;
171 __buffers.set(_buffers);
172 _buffers=null;
173 }
174 }
175 }
176 }
177
178
179 public Connection handle() throws IOException
180 {
181 try
182 {
183 allocateBuffers();
184
185 boolean progress=true;
186
187 while (progress)
188 {
189 progress=false;
190
191
192 if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
193 progress=process(null,null);
194
195
196 AsyncConnection next = (AsyncConnection)_connection.handle();
197 if (next!=_connection && next!=null)
198 {
199 _connection=next;
200 progress=true;
201 }
202
203 _logger.debug("{} handle {} progress={}", _session, this, progress);
204 }
205 }
206 finally
207 {
208 releaseBuffers();
209
210 if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
211 {
212 _ishut=true;
213 try
214 {
215 _connection.onInputShutdown();
216 }
217 catch(Throwable x)
218 {
219 _logger.warn("onInputShutdown failed", x);
220 try{_sslEndPoint.close();}
221 catch(IOException e2){
222 _logger.ignore(e2);}
223 }
224 }
225 }
226
227 return this;
228 }
229
230
231 public boolean isIdle()
232 {
233 return false;
234 }
235
236
237 public boolean isSuspended()
238 {
239 return false;
240 }
241
242
243 public void onClose()
244 {
245 Connection connection = _sslEndPoint.getConnection();
246 if (connection != null && connection != this)
247 connection.onClose();
248 }
249
250
251 @Override
252 public void onIdleExpired(long idleForMs)
253 {
254 try
255 {
256 _logger.debug("onIdleExpired {}ms on {}",idleForMs,this);
257 if (_endp.isOutputShutdown())
258 _sslEndPoint.close();
259 else
260 _sslEndPoint.shutdownOutput();
261 }
262 catch (IOException e)
263 {
264 _logger.warn(e);
265 super.onIdleExpired(idleForMs);
266 }
267 }
268
269
270 public void onInputShutdown() throws IOException
271 {
272
273 }
274
275
276 private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException
277 {
278 boolean some_progress=false;
279 try
280 {
281
282 allocateBuffers();
283
284
285 if (toFill==null)
286 {
287
288 _unwrapBuf.compact();
289 toFill=_unwrapBuf;
290 }
291
292 else if (toFill.capacity()<_session.getApplicationBufferSize())
293 {
294
295 boolean progress=process(null,toFlush);
296
297
298 if (_unwrapBuf!=null && _unwrapBuf.hasContent())
299 {
300
301 _unwrapBuf.skip(toFill.put(_unwrapBuf));
302 return true;
303 }
304 else
305
306 return progress;
307 }
308
309 else if (_unwrapBuf!=null && _unwrapBuf.hasContent())
310 {
311
312 _unwrapBuf.skip(toFill.put(_unwrapBuf));
313 return true;
314 }
315
316
317
318
319 if (toFlush==null)
320 toFlush=__ZERO_BUFFER;
321
322
323 boolean progress=true;
324 while (progress)
325 {
326 progress=false;
327
328
329 int filled=0,flushed=0;
330 try
331 {
332
333 if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0)
334 progress = true;
335
336
337 if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0)
338 progress = true;
339 }
340 catch (IOException e)
341 {
342 _endp.close();
343 throw e;
344 }
345 finally
346 {
347 _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length());
348 }
349
350
351 switch(_engine.getHandshakeStatus())
352 {
353 case FINISHED:
354 throw new IllegalStateException();
355
356 case NOT_HANDSHAKING:
357 {
358
359 if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill))
360 progress=true;
361
362
363 if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush))
364 progress=true;
365 }
366 break;
367
368 case NEED_TASK:
369 {
370
371 Runnable task;
372 while ((task=_engine.getDelegatedTask())!=null)
373 {
374 progress=true;
375 task.run();
376 }
377
378 }
379 break;
380
381 case NEED_WRAP:
382 {
383
384 if (_handshook && !_allowRenegotiate)
385 _endp.close();
386 else if (wrap(toFlush))
387 progress=true;
388 }
389 break;
390
391 case NEED_UNWRAP:
392 {
393
394 if (_handshook && !_allowRenegotiate)
395 _endp.close();
396 else if (!_inbound.hasContent()&&filled==-1)
397 {
398
399 _endp.shutdownInput();
400 }
401 else if (unwrap(toFill))
402 progress=true;
403 }
404 break;
405 }
406
407
408 if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasContent())
409 _engine.closeInbound();
410
411 if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasContent())
412 _endp.shutdownOutput();
413
414
415 some_progress|=progress;
416 }
417
418
419 if (toFill==_unwrapBuf && _unwrapBuf.hasContent() && !_connection.isSuspended())
420 _aEndp.dispatch();
421 }
422 finally
423 {
424 releaseBuffers();
425 if (some_progress)
426 _progressed.set(true);
427 }
428 return some_progress;
429 }
430
431 private synchronized boolean wrap(final Buffer buffer) throws IOException
432 {
433 ByteBuffer bbuf=extractByteBuffer(buffer);
434 final SSLEngineResult result;
435
436 synchronized(bbuf)
437 {
438 _outbound.compact();
439 ByteBuffer out_buffer=_outbound.getByteBuffer();
440 synchronized(out_buffer)
441 {
442 try
443 {
444 bbuf.position(buffer.getIndex());
445 bbuf.limit(buffer.putIndex());
446 out_buffer.position(_outbound.putIndex());
447 out_buffer.limit(out_buffer.capacity());
448 result=_engine.wrap(bbuf,out_buffer);
449 if (_logger.isDebugEnabled())
450 _logger.debug("{} wrap {} {} consumed={} produced={}",
451 _session,
452 result.getStatus(),
453 result.getHandshakeStatus(),
454 result.bytesConsumed(),
455 result.bytesProduced());
456
457
458 buffer.skip(result.bytesConsumed());
459 _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced());
460 }
461 catch(SSLException e)
462 {
463 _logger.debug(String.valueOf(_endp), e);
464 _endp.close();
465 throw e;
466 }
467 finally
468 {
469 out_buffer.position(0);
470 out_buffer.limit(out_buffer.capacity());
471 bbuf.position(0);
472 bbuf.limit(bbuf.capacity());
473 }
474 }
475 }
476
477 switch(result.getStatus())
478 {
479 case BUFFER_UNDERFLOW:
480 throw new IllegalStateException();
481
482 case BUFFER_OVERFLOW:
483 break;
484
485 case OK:
486 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
487 _handshook=true;
488 break;
489
490 case CLOSED:
491 _logger.debug("wrap CLOSE {} {}",this,result);
492 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
493 _endp.close();
494 break;
495
496 default:
497 _logger.debug("{} wrap default {}",_session,result);
498 throw new IOException(result.toString());
499 }
500
501 return result.bytesConsumed()>0 || result.bytesProduced()>0;
502 }
503
504 private synchronized boolean unwrap(final Buffer buffer) throws IOException
505 {
506 if (!_inbound.hasContent())
507 return false;
508
509 ByteBuffer bbuf=extractByteBuffer(buffer);
510 final SSLEngineResult result;
511
512 synchronized(bbuf)
513 {
514 ByteBuffer in_buffer=_inbound.getByteBuffer();
515 synchronized(in_buffer)
516 {
517 try
518 {
519 bbuf.position(buffer.putIndex());
520 bbuf.limit(buffer.capacity());
521 in_buffer.position(_inbound.getIndex());
522 in_buffer.limit(_inbound.putIndex());
523
524 result=_engine.unwrap(in_buffer,bbuf);
525 if (_logger.isDebugEnabled())
526 _logger.debug("{} unwrap {} {} consumed={} produced={}",
527 _session,
528 result.getStatus(),
529 result.getHandshakeStatus(),
530 result.bytesConsumed(),
531 result.bytesProduced());
532
533 _inbound.skip(result.bytesConsumed());
534 _inbound.compact();
535 buffer.setPutIndex(buffer.putIndex()+result.bytesProduced());
536 }
537 catch(SSLException e)
538 {
539 _logger.debug(String.valueOf(_endp), e);
540 _endp.close();
541 throw e;
542 }
543 finally
544 {
545 in_buffer.position(0);
546 in_buffer.limit(in_buffer.capacity());
547 bbuf.position(0);
548 bbuf.limit(bbuf.capacity());
549 }
550 }
551 }
552
553 switch(result.getStatus())
554 {
555 case BUFFER_UNDERFLOW:
556 if (_endp.isInputShutdown())
557 _inbound.clear();
558 break;
559
560 case BUFFER_OVERFLOW:
561 if (_logger.isDebugEnabled()) _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString());
562 break;
563
564 case OK:
565 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
566 _handshook=true;
567 break;
568
569 case CLOSED:
570 _logger.debug("unwrap CLOSE {} {}",this,result);
571 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
572 _endp.close();
573 break;
574
575 default:
576 _logger.debug("{} wrap default {}",_session,result);
577 throw new IOException(result.toString());
578 }
579
580
581
582
583 return result.bytesConsumed()>0 || result.bytesProduced()>0;
584 }
585
586
587
588 private ByteBuffer extractByteBuffer(Buffer buffer)
589 {
590 if (buffer.buffer() instanceof NIOBuffer)
591 return ((NIOBuffer)buffer.buffer()).getByteBuffer();
592 return ByteBuffer.wrap(buffer.array());
593 }
594
595
596 public AsyncEndPoint getSslEndPoint()
597 {
598 return _sslEndPoint;
599 }
600
601
602 public String toString()
603 {
604 return String.format("%s %s", super.toString(), _sslEndPoint);
605 }
606
607
608
609 public class SslEndPoint implements AsyncEndPoint
610 {
611 public SSLEngine getSslEngine()
612 {
613 return _engine;
614 }
615
616 public AsyncEndPoint getEndpoint()
617 {
618 return _aEndp;
619 }
620
621 public void shutdownOutput() throws IOException
622 {
623 synchronized (SslConnection.this)
624 {
625 _logger.debug("{} ssl endp.oshut {}",_session,this);
626 _engine.closeOutbound();
627 _oshut=true;
628 }
629 flush();
630 }
631
632 public boolean isOutputShutdown()
633 {
634 synchronized (SslConnection.this)
635 {
636 return _oshut||!isOpen()||_engine.isOutboundDone();
637 }
638 }
639
640 public void shutdownInput() throws IOException
641 {
642 _logger.debug("{} ssl endp.ishut!",_session);
643
644
645 }
646
647 public boolean isInputShutdown()
648 {
649 synchronized (SslConnection.this)
650 {
651 return _endp.isInputShutdown() &&
652 !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) &&
653 !(_inbound!=null&&_inbound.hasContent());
654 }
655 }
656
657 public void close() throws IOException
658 {
659 _logger.debug("{} ssl endp.close",_session);
660 _endp.close();
661 }
662
663 public int fill(Buffer buffer) throws IOException
664 {
665 int size=buffer.length();
666 process(buffer, null);
667
668 int filled=buffer.length()-size;
669
670 if (filled==0 && isInputShutdown())
671 return -1;
672 return filled;
673 }
674
675 public int flush(Buffer buffer) throws IOException
676 {
677 int size = buffer.length();
678 process(null, buffer);
679 return size-buffer.length();
680 }
681
682 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
683 {
684 if (header!=null && header.hasContent())
685 return flush(header);
686 if (buffer!=null && buffer.hasContent())
687 return flush(buffer);
688 if (trailer!=null && trailer.hasContent())
689 return flush(trailer);
690 return 0;
691 }
692
693 public boolean blockReadable(long millisecs) throws IOException
694 {
695 long now = System.currentTimeMillis();
696 long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;
697
698 while (now<end)
699 {
700 if (process(null,null))
701 break;
702 _endp.blockReadable(end-now);
703 now = System.currentTimeMillis();
704 }
705
706 return now<end;
707 }
708
709 public boolean blockWritable(long millisecs) throws IOException
710 {
711 return _endp.blockWritable(millisecs);
712 }
713
714 public boolean isOpen()
715 {
716 return _endp.isOpen();
717 }
718
719 public Object getTransport()
720 {
721 return _endp;
722 }
723
724 public void flush() throws IOException
725 {
726 process(null, null);
727 }
728
729 public void dispatch()
730 {
731 _aEndp.dispatch();
732 }
733
734 public void asyncDispatch()
735 {
736 _aEndp.asyncDispatch();
737 }
738
739 public void scheduleWrite()
740 {
741 _aEndp.scheduleWrite();
742 }
743
744 public void onIdleExpired(long idleForMs)
745 {
746 _aEndp.onIdleExpired(idleForMs);
747 }
748
749 public void setCheckForIdle(boolean check)
750 {
751 _aEndp.setCheckForIdle(check);
752 }
753
754 public boolean isCheckForIdle()
755 {
756 return _aEndp.isCheckForIdle();
757 }
758
759 public void scheduleTimeout(Task task, long timeoutMs)
760 {
761 _aEndp.scheduleTimeout(task,timeoutMs);
762 }
763
764 public void cancelTimeout(Task task)
765 {
766 _aEndp.cancelTimeout(task);
767 }
768
769 public boolean isWritable()
770 {
771 return _aEndp.isWritable();
772 }
773
774 public boolean hasProgressed()
775 {
776 return _progressed.getAndSet(false);
777 }
778
779 public String getLocalAddr()
780 {
781 return _aEndp.getLocalAddr();
782 }
783
784 public String getLocalHost()
785 {
786 return _aEndp.getLocalHost();
787 }
788
789 public int getLocalPort()
790 {
791 return _aEndp.getLocalPort();
792 }
793
794 public String getRemoteAddr()
795 {
796 return _aEndp.getRemoteAddr();
797 }
798
799 public String getRemoteHost()
800 {
801 return _aEndp.getRemoteHost();
802 }
803
804 public int getRemotePort()
805 {
806 return _aEndp.getRemotePort();
807 }
808
809 public boolean isBlocking()
810 {
811 return false;
812 }
813
814 public int getMaxIdleTime()
815 {
816 return _aEndp.getMaxIdleTime();
817 }
818
819 public void setMaxIdleTime(int timeMs) throws IOException
820 {
821 _aEndp.setMaxIdleTime(timeMs);
822 }
823
824 public Connection getConnection()
825 {
826 return _connection;
827 }
828
829 public void setConnection(Connection connection)
830 {
831 _connection=(AsyncConnection)connection;
832 }
833
834 public String toString()
835 {
836
837
838
839 Buffer inbound = _inbound;
840 Buffer outbound = _outbound;
841 Buffer unwrap = _unwrapBuf;
842 int i = inbound == null? -1 : inbound.length();
843 int o = outbound == null ? -1 : outbound.length();
844 int u = unwrap == null ? -1 : unwrap.length();
845 return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}",
846 _engine.getHandshakeStatus(),
847 i, o, u,
848 _ishut, _oshut,
849 _connection);
850 }
851
852 }
853 }