1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.ByteBuffer;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import javax.net.ssl.SSLEngine;
20 import javax.net.ssl.SSLEngineResult;
21 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
22 import javax.net.ssl.SSLException;
23 import javax.net.ssl.SSLSession;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Timeout.Task;
33
34
35
36
37
38
39
40
41
42
43 public class SslConnection extends AbstractConnection implements AsyncConnection
44 {
45 static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio.ssl");
46
47 private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0);
48
49 private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
50 private final SSLEngine _engine;
51 private final SSLSession _session;
52 private AsyncConnection _connection;
53 private final SslEndPoint _sslEndPoint;
54 private int _allocations;
55 private SslBuffers _buffers;
56 private NIOBuffer _inbound;
57 private NIOBuffer _unwrapBuf;
58 private NIOBuffer _outbound;
59 private AsyncEndPoint _aEndp;
60 private boolean _allowRenegotiate=true;
61 private boolean _handshook;
62 private boolean _ishut;
63 private boolean _oshut;
64 private final AtomicBoolean _progressed = new AtomicBoolean();
65
66
67
68
69 private static class SslBuffers
70 {
71 final NIOBuffer _in;
72 final NIOBuffer _out;
73 final NIOBuffer _unwrap;
74
75 SslBuffers(int packetSize, int appSize)
76 {
77 _in=new IndirectNIOBuffer(packetSize);
78 _out=new IndirectNIOBuffer(packetSize);
79 _unwrap=new IndirectNIOBuffer(appSize);
80 }
81 }
82
83
84 public SslConnection(SSLEngine engine,EndPoint endp)
85 {
86 this(engine,endp,System.currentTimeMillis());
87 }
88
89
90 public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp)
91 {
92 super(endp,timeStamp);
93 _engine=engine;
94 _session=_engine.getSession();
95 _aEndp=(AsyncEndPoint)endp;
96 _sslEndPoint = newSslEndPoint();
97 }
98
99
100 protected SslEndPoint newSslEndPoint()
101 {
102 return new SslEndPoint();
103 }
104
105
106
107
108
109 public boolean isAllowRenegotiate()
110 {
111 return _allowRenegotiate;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 public void setAllowRenegotiate(boolean allowRenegotiate)
126 {
127 _allowRenegotiate = allowRenegotiate;
128 }
129
130
131 private void allocateBuffers()
132 {
133 synchronized (this)
134 {
135 if (_allocations++==0)
136 {
137 if (_buffers==null)
138 {
139 _buffers=__buffers.get();
140 if (_buffers==null)
141 _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
142 _inbound=_buffers._in;
143 _outbound=_buffers._out;
144 _unwrapBuf=_buffers._unwrap;
145 __buffers.set(null);
146 }
147 }
148 }
149 }
150
151
152 private void releaseBuffers()
153 {
154 synchronized (this)
155 {
156 if (--_allocations==0)
157 {
158 if (_buffers!=null &&
159 _inbound.length()==0 &&
160 _outbound.length()==0 &&
161 _unwrapBuf.length()==0)
162 {
163 _inbound=null;
164 _outbound=null;
165 _unwrapBuf=null;
166 __buffers.set(_buffers);
167 _buffers=null;
168 }
169 }
170 }
171 }
172
173
174 public Connection handle() throws IOException
175 {
176 try
177 {
178 allocateBuffers();
179
180 boolean progress=true;
181
182 while (progress)
183 {
184 progress=false;
185
186
187 if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
188 {
189 progress=process(null,null);
190 }
191 else
192 {
193
194 AsyncConnection next = (AsyncConnection)_connection.handle();
195 if (next!=_connection && next!=null)
196 {
197 _connection=next;
198 progress=true;
199 }
200
201 }
202
203 LOG.debug("{} handle {} progress={}", _session, this, progress);
204 }
205 }
206 finally
207 {
208 releaseBuffers();
209
210 if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
211 {
212 _ishut=true;
213 try
214 {
215 _connection.onInputShutdown();
216 }
217 catch(Throwable x)
218 {
219 LOG.warn("onInputShutdown failed", x);
220 try{_sslEndPoint.close();}
221 catch(IOException e2){LOG.ignore(e2);}
222 }
223 }
224 }
225
226 return this;
227 }
228
229
230 public boolean isIdle()
231 {
232 return false;
233 }
234
235
236 public boolean isSuspended()
237 {
238 return false;
239 }
240
241
242 public void onClose()
243 {
244 }
245
246
247 @Override
248 public void onIdleExpired(long idleForMs)
249 {
250 try
251 {
252 LOG.debug("onIdleExpired {}ms on {}",idleForMs,this);
253 if (_endp.isOutputShutdown())
254 _sslEndPoint.close();
255 else
256 _sslEndPoint.shutdownOutput();
257 }
258 catch (IOException e)
259 {
260 LOG.warn(e);
261 super.onIdleExpired(idleForMs);
262 }
263 }
264
265
266 public void onInputShutdown() throws IOException
267 {
268
269 }
270
271
272 private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException
273 {
274 boolean some_progress=false;
275 try
276 {
277 allocateBuffers();
278 if (toFill==null)
279 {
280 _unwrapBuf.compact();
281 toFill=_unwrapBuf;
282 }
283 else if (toFill.capacity()<_session.getApplicationBufferSize())
284 {
285 boolean progress=process(null,toFlush);
286 if (_unwrapBuf!=null && _unwrapBuf.hasContent())
287 {
288 _unwrapBuf.skip(toFill.put(_unwrapBuf));
289 return true;
290 }
291 else
292 return progress;
293 }
294 else if (_unwrapBuf!=null && _unwrapBuf.hasContent())
295 {
296 _unwrapBuf.skip(toFill.put(_unwrapBuf));
297 return true;
298 }
299
300
301 if (toFlush==null)
302 toFlush=__ZERO_BUFFER;
303
304 boolean progress=true;
305
306 while (progress)
307 {
308 progress=false;
309 int filled=0,flushed=0;
310
311 try
312 {
313
314 if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0)
315 progress = true;
316
317
318 if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0)
319 progress = true;
320 }
321 catch (IOException e)
322 {
323 _endp.close();
324 throw e;
325 }
326 finally
327 {
328 LOG.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length());
329 }
330
331
332 switch(_engine.getHandshakeStatus())
333 {
334 case FINISHED:
335 throw new IllegalStateException();
336
337 case NOT_HANDSHAKING:
338 {
339
340 if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill))
341 progress=true;
342
343
344 if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush))
345 progress=true;
346 }
347 break;
348
349 case NEED_TASK:
350 {
351
352 Runnable task;
353 while ((task=_engine.getDelegatedTask())!=null)
354 {
355 progress=true;
356 task.run();
357 }
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 }
374 break;
375
376 case NEED_WRAP:
377 {
378
379 if (_handshook && !_allowRenegotiate)
380 _endp.close();
381 else if (wrap(toFlush))
382 progress=true;
383 }
384 break;
385
386 case NEED_UNWRAP:
387 {
388
389 if (_handshook && !_allowRenegotiate)
390 _endp.close();
391 else if (unwrap(toFill))
392 progress=true;
393 }
394 break;
395 }
396
397
398 if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasContent())
399 _engine.closeInbound();
400
401 if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasContent())
402 _endp.shutdownOutput();
403
404 some_progress|=progress;
405 }
406
407 if (toFill.hasContent())
408 _aEndp.asyncDispatch();
409 }
410 finally
411 {
412 releaseBuffers();
413 if (some_progress)
414 _progressed.set(true);
415 }
416 return some_progress;
417 }
418
419 private synchronized boolean wrap(final Buffer buffer) throws IOException
420 {
421 ByteBuffer bbuf=extractByteBuffer(buffer);
422 final SSLEngineResult result;
423
424 synchronized(bbuf)
425 {
426 _outbound.compact();
427 ByteBuffer out_buffer=_outbound.getByteBuffer();
428 synchronized(out_buffer)
429 {
430 try
431 {
432 bbuf.position(buffer.getIndex());
433 bbuf.limit(buffer.putIndex());
434 out_buffer.position(_outbound.putIndex());
435 out_buffer.limit(out_buffer.capacity());
436 result=_engine.wrap(bbuf,out_buffer);
437 if (LOG.isDebugEnabled())
438 LOG.debug("{} wrap {} {} consumed={} produced={}",
439 _session,
440 result.getStatus(),
441 result.getHandshakeStatus(),
442 result.bytesConsumed(),
443 result.bytesProduced());
444
445
446 buffer.skip(result.bytesConsumed());
447 _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced());
448 }
449 catch(SSLException e)
450 {
451 LOG.warn(String.valueOf(_endp), e);
452 _endp.close();
453 throw e;
454 }
455 finally
456 {
457 out_buffer.position(0);
458 out_buffer.limit(out_buffer.capacity());
459 bbuf.position(0);
460 bbuf.limit(bbuf.capacity());
461 }
462 }
463 }
464
465 switch(result.getStatus())
466 {
467 case BUFFER_UNDERFLOW:
468 throw new IllegalStateException();
469
470 case BUFFER_OVERFLOW:
471 break;
472
473 case OK:
474 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
475 _handshook=true;
476 break;
477
478 case CLOSED:
479 LOG.debug("wrap CLOSE {} {}",this,result);
480 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
481 _endp.close();
482 break;
483
484 default:
485 LOG.warn("{} wrap default {}",_session,result);
486 throw new IOException(result.toString());
487 }
488
489 return result.bytesConsumed()>0 || result.bytesProduced()>0;
490 }
491
492 private synchronized boolean unwrap(final Buffer buffer) throws IOException
493 {
494 if (!_inbound.hasContent())
495 return false;
496
497 ByteBuffer bbuf=extractByteBuffer(buffer);
498 final SSLEngineResult result;
499
500 synchronized(bbuf)
501 {
502 ByteBuffer in_buffer=_inbound.getByteBuffer();
503 synchronized(in_buffer)
504 {
505 try
506 {
507 bbuf.position(buffer.putIndex());
508 bbuf.limit(buffer.capacity());
509 in_buffer.position(_inbound.getIndex());
510 in_buffer.limit(_inbound.putIndex());
511
512 result=_engine.unwrap(in_buffer,bbuf);
513 if (LOG.isDebugEnabled())
514 LOG.debug("{} unwrap {} {} consumed={} produced={}",
515 _session,
516 result.getStatus(),
517 result.getHandshakeStatus(),
518 result.bytesConsumed(),
519 result.bytesProduced());
520
521 _inbound.skip(result.bytesConsumed());
522 _inbound.compact();
523 buffer.setPutIndex(buffer.putIndex()+result.bytesProduced());
524 }
525 catch(SSLException e)
526 {
527 LOG.warn(String.valueOf(_endp), e);
528 _endp.close();
529 throw e;
530 }
531 finally
532 {
533 in_buffer.position(0);
534 in_buffer.limit(in_buffer.capacity());
535 bbuf.position(0);
536 bbuf.limit(bbuf.capacity());
537 }
538 }
539 }
540
541 switch(result.getStatus())
542 {
543 case BUFFER_UNDERFLOW:
544 break;
545
546 case BUFFER_OVERFLOW:
547 LOG.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString());
548 break;
549
550 case OK:
551 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
552 _handshook=true;
553 break;
554
555 case CLOSED:
556 LOG.debug("unwrap CLOSE {} {}",this,result);
557 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
558 _endp.close();
559 break;
560
561 default:
562 LOG.warn("{} wrap default {}",_session,result);
563 throw new IOException(result.toString());
564 }
565
566
567
568
569 return result.bytesConsumed()>0 || result.bytesProduced()>0;
570 }
571
572
573
574 private ByteBuffer extractByteBuffer(Buffer buffer)
575 {
576 if (buffer.buffer() instanceof NIOBuffer)
577 return ((NIOBuffer)buffer.buffer()).getByteBuffer();
578 return ByteBuffer.wrap(buffer.array());
579 }
580
581
582 public AsyncEndPoint getSslEndPoint()
583 {
584 return _sslEndPoint;
585 }
586
587
588 public String toString()
589 {
590 return String.format("%s %s", super.toString(), _sslEndPoint);
591 }
592
593
594
595 public class SslEndPoint implements AsyncEndPoint
596 {
597 public SSLEngine getSslEngine()
598 {
599 return _engine;
600 }
601
602 public AsyncEndPoint getEndpoint()
603 {
604 return _aEndp;
605 }
606
607 public void shutdownOutput() throws IOException
608 {
609 synchronized (SslConnection.this)
610 {
611 LOG.debug("{} ssl endp.oshut {}",_session,this);
612 _engine.closeOutbound();
613 _oshut=true;
614 }
615 flush();
616 }
617
618 public boolean isOutputShutdown()
619 {
620 synchronized (SslConnection.this)
621 {
622 return _oshut||!isOpen()||_engine.isOutboundDone();
623 }
624 }
625
626 public void shutdownInput() throws IOException
627 {
628 LOG.debug("{} ssl endp.ishut!",_session);
629
630
631 }
632
633 public boolean isInputShutdown()
634 {
635 synchronized (SslConnection.this)
636 {
637 return _endp.isInputShutdown() &&
638 !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) &&
639 !(_inbound!=null&&_inbound.hasContent());
640 }
641 }
642
643 public void close() throws IOException
644 {
645 LOG.debug("{} ssl endp.close",_session);
646 _endp.close();
647 }
648
649 public int fill(Buffer buffer) throws IOException
650 {
651 int size=buffer.length();
652 process(buffer, null);
653
654 int filled=buffer.length()-size;
655
656 if (filled==0 && isInputShutdown())
657 return -1;
658 return filled;
659 }
660
661 public int flush(Buffer buffer) throws IOException
662 {
663 int size = buffer.length();
664 process(null, buffer);
665 return size-buffer.length();
666 }
667
668 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
669 {
670 if (header!=null && header.hasContent())
671 return flush(header);
672 if (buffer!=null && buffer.hasContent())
673 return flush(buffer);
674 if (trailer!=null && trailer.hasContent())
675 return flush(trailer);
676 return 0;
677 }
678
679 public boolean blockReadable(long millisecs) throws IOException
680 {
681 long now = System.currentTimeMillis();
682 long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;
683
684 while (now<end)
685 {
686 if (process(null,null))
687 break;
688 _endp.blockReadable(end-now);
689 now = System.currentTimeMillis();
690 }
691
692 return now<end;
693 }
694
695 public boolean blockWritable(long millisecs) throws IOException
696 {
697 return _endp.blockWritable(millisecs);
698 }
699
700 public boolean isOpen()
701 {
702 return _endp.isOpen();
703 }
704
705 public Object getTransport()
706 {
707 return _endp;
708 }
709
710 public void flush() throws IOException
711 {
712 process(null, null);
713 }
714
715 public void asyncDispatch()
716 {
717 _aEndp.asyncDispatch();
718 }
719
720 public void scheduleWrite()
721 {
722 _aEndp.scheduleWrite();
723 }
724
725 public void onIdleExpired(long idleForMs)
726 {
727 _aEndp.onIdleExpired(idleForMs);
728 }
729
730 public void setCheckForIdle(boolean check)
731 {
732 _aEndp.setCheckForIdle(check);
733 }
734
735 public boolean isCheckForIdle()
736 {
737 return _aEndp.isCheckForIdle();
738 }
739
740 public void scheduleTimeout(Task task, long timeoutMs)
741 {
742 _aEndp.scheduleTimeout(task,timeoutMs);
743 }
744
745 public void cancelTimeout(Task task)
746 {
747 _aEndp.cancelTimeout(task);
748 }
749
750 public boolean isWritable()
751 {
752 return _aEndp.isWritable();
753 }
754
755 public boolean hasProgressed()
756 {
757 return _progressed.getAndSet(false);
758 }
759
760 public String getLocalAddr()
761 {
762 return _aEndp.getLocalAddr();
763 }
764
765 public String getLocalHost()
766 {
767 return _aEndp.getLocalHost();
768 }
769
770 public int getLocalPort()
771 {
772 return _aEndp.getLocalPort();
773 }
774
775 public String getRemoteAddr()
776 {
777 return _aEndp.getRemoteAddr();
778 }
779
780 public String getRemoteHost()
781 {
782 return _aEndp.getRemoteHost();
783 }
784
785 public int getRemotePort()
786 {
787 return _aEndp.getRemotePort();
788 }
789
790 public boolean isBlocking()
791 {
792 return false;
793 }
794
795 public int getMaxIdleTime()
796 {
797 return _aEndp.getMaxIdleTime();
798 }
799
800 public void setMaxIdleTime(int timeMs) throws IOException
801 {
802 _aEndp.setMaxIdleTime(timeMs);
803 }
804
805 public Connection getConnection()
806 {
807 return _connection;
808 }
809
810 public void setConnection(Connection connection)
811 {
812 _connection=(AsyncConnection)connection;
813 }
814
815 public String toString()
816 {
817
818
819
820 Buffer inbound = _inbound;
821 Buffer outbound = _outbound;
822 Buffer unwrap = _unwrapBuf;
823 int i = inbound == null? -1 : inbound.length();
824 int o = outbound == null ? -1 : outbound.length();
825 int u = unwrap == null ? -1 : unwrap.length();
826 return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}",
827 _engine.getHandshakeStatus(),
828 i, o, u,
829 _ishut, _oshut,
830 _connection);
831 }
832
833 }
834 }