View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2011 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.ByteBuffer;
18  import java.util.concurrent.atomic.AtomicBoolean;
19  import javax.net.ssl.SSLEngine;
20  import javax.net.ssl.SSLEngineResult;
21  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
22  import javax.net.ssl.SSLException;
23  import javax.net.ssl.SSLSession;
24  
25  import org.eclipse.jetty.io.AbstractConnection;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.Buffer;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Timeout.Task;
33  
34  /* ------------------------------------------------------------ */
35  /** SSL Connection.
36   * An AysyncConnection that acts as an interceptor between and EndPoint and another
37   * Connection, that implements TLS encryption using an {@link SSLEngine}.
38   * <p>
39   * The connector uses an {@link AsyncEndPoint} (like {@link SelectChannelEndPoint}) as
40   * it's source/sink of encrypted data.   It then provides {@link #getSslEndPoint()} to
41   * expose a source/sink of unencrypted data to another connection (eg HttpConnection).
42   */
43  public class SslConnection extends AbstractConnection implements AsyncConnection
44  {
45      private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
46  
47      private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0);
48  
49      private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
50      private final SSLEngine _engine;
51      private final SSLSession _session;
52      private AsyncConnection _connection;
53      private final SslEndPoint _sslEndPoint;
54      private int _allocations;
55      private SslBuffers _buffers;
56      private NIOBuffer _inbound;
57      private NIOBuffer _unwrapBuf;
58      private NIOBuffer _outbound;
59      private AsyncEndPoint _aEndp;
60      private boolean _allowRenegotiate=true;
61      private boolean _handshook;
62      private boolean _ishut;
63      private boolean _oshut;
64      private final AtomicBoolean _progressed = new AtomicBoolean();
65  
66      /* ------------------------------------------------------------ */
67      /* this is a half baked buffer pool
68       */
69      private static class SslBuffers
70      {
71          final NIOBuffer _in;
72          final NIOBuffer _out;
73          final NIOBuffer _unwrap;
74  
75          SslBuffers(int packetSize, int appSize)
76          {
77              _in=new IndirectNIOBuffer(packetSize);
78              _out=new IndirectNIOBuffer(packetSize);
79              _unwrap=new IndirectNIOBuffer(appSize);
80          }
81      }
82  
83      /* ------------------------------------------------------------ */
84      public SslConnection(SSLEngine engine,EndPoint endp)
85      {
86          this(engine,endp,System.currentTimeMillis());
87      }
88  
89      /* ------------------------------------------------------------ */
90      public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp)
91      {
92          super(endp,timeStamp);
93          _engine=engine;
94          _session=_engine.getSession();
95          _aEndp=(AsyncEndPoint)endp;
96          _sslEndPoint = newSslEndPoint();
97      }
98  
99      /* ------------------------------------------------------------ */
100     protected SslEndPoint newSslEndPoint()
101     {
102         return new SslEndPoint();
103     }
104 
105     /* ------------------------------------------------------------ */
106     /**
107      * @return True if SSL re-negotiation is allowed (default false)
108      */
109     public boolean isAllowRenegotiate()
110     {
111         return _allowRenegotiate;
112     }
113 
114     /* ------------------------------------------------------------ */
115     /**
116      * Set if SSL re-negotiation is allowed. CVE-2009-3555 discovered
117      * a vulnerability in SSL/TLS with re-negotiation.  If your JVM
118      * does not have CVE-2009-3555 fixed, then re-negotiation should
119      * not be allowed.  CVE-2009-3555 was fixed in Sun java 1.6 with a ban
120      * of renegotiates in u19 and with RFC5746 in u22.
121      *
122      * @param allowRenegotiate
123      *            true if re-negotiation is allowed (default false)
124      */
125     public void setAllowRenegotiate(boolean allowRenegotiate)
126     {
127         _allowRenegotiate = allowRenegotiate;
128     }
129 
130     /* ------------------------------------------------------------ */
131     private void allocateBuffers()
132     {
133         synchronized (this)
134         {
135             if (_allocations++==0)
136             {
137                 if (_buffers==null)
138                 {
139                     _buffers=__buffers.get();
140                     if (_buffers==null)
141                         _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
142                     _inbound=_buffers._in;
143                     _outbound=_buffers._out;
144                     _unwrapBuf=_buffers._unwrap;
145                     __buffers.set(null);
146                 }
147             }
148         }
149     }
150 
151     /* ------------------------------------------------------------ */
152     private void releaseBuffers()
153     {
154         synchronized (this)
155         {
156             if (--_allocations==0)
157             {
158                 if (_buffers!=null &&
159                     _inbound.length()==0 &&
160                     _outbound.length()==0 &&
161                     _unwrapBuf.length()==0)
162                 {
163                     _inbound=null;
164                     _outbound=null;
165                     _unwrapBuf=null;
166                     __buffers.set(_buffers);
167                     _buffers=null;
168                 }
169             }
170         }
171     }
172 
173     /* ------------------------------------------------------------ */
174     public Connection handle() throws IOException
175     {
176         try
177         {
178             allocateBuffers();
179 
180             boolean progress=true;
181 
182             while (progress)
183             {
184                 progress=false;
185 
186                 // If we are handshook let the delegate connection
187                 if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
188                     progress=process(null,null);
189 
190                 // handle the delegate connection
191                 AsyncConnection next = (AsyncConnection)_connection.handle();
192                 if (next!=_connection && next!=null)
193                 {
194                     _connection=next;
195                     progress=true;
196                 }
197 
198                 _logger.debug("{} handle {} progress={}", _session, this, progress);
199             }
200         }
201         finally
202         {
203             releaseBuffers();
204 
205             if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
206             {
207                 _ishut=true;
208                 try
209                 {
210                     _connection.onInputShutdown();
211                 }
212                 catch(Throwable x)
213                 {
214                     _logger.warn("onInputShutdown failed", x);
215                     try{_sslEndPoint.close();}
216                     catch(IOException e2){
217                         _logger.ignore(e2);}
218                 }
219             }
220         }
221 
222         return this;
223     }
224 
225     /* ------------------------------------------------------------ */
226     public boolean isIdle()
227     {
228         return false;
229     }
230 
231     /* ------------------------------------------------------------ */
232     public boolean isSuspended()
233     {
234         return false;
235     }
236 
237     /* ------------------------------------------------------------ */
238     public void onClose()
239     {
240     }
241 
242     /* ------------------------------------------------------------ */
243     @Override
244     public void onIdleExpired(long idleForMs)
245     {
246         try
247         {
248             _logger.debug("onIdleExpired {}ms on {}",idleForMs,this);
249             if (_endp.isOutputShutdown())
250                 _sslEndPoint.close();
251             else
252                 _sslEndPoint.shutdownOutput();
253         }
254         catch (IOException e)
255         {
256             _logger.warn(e);
257             super.onIdleExpired(idleForMs);
258         }
259     }
260 
261     /* ------------------------------------------------------------ */
262     public void onInputShutdown() throws IOException
263     {
264 
265     }
266 
267     /* ------------------------------------------------------------ */
268     private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException
269     {
270         boolean some_progress=false;
271         try
272         {
273             // We need buffers to progress
274             allocateBuffers();
275 
276             // if we don't have a buffer to put received data into
277             if (toFill==null)
278             {
279                 // use the unwrapbuffer to hold received data.
280                 _unwrapBuf.compact();
281                 toFill=_unwrapBuf;
282             }
283             // Else if the fill buffer is too small for the SSL session
284             else if (toFill.capacity()<_session.getApplicationBufferSize())
285             {
286                 // fill to the temporary unwrapBuffer
287                 boolean progress=process(null,toFlush);
288 
289                 // if we received any data,
290                 if (_unwrapBuf!=null && _unwrapBuf.hasContent())
291                 {
292                     // transfer from temp buffer to fill buffer
293                     _unwrapBuf.skip(toFill.put(_unwrapBuf));
294                     return true;
295                 }
296                 else
297                     // return progress from recursive call
298                     return progress;
299             }
300             // Else if there is some temporary data
301             else if (_unwrapBuf!=null && _unwrapBuf.hasContent())
302             {
303                 // transfer from temp buffer to fill buffer
304                 _unwrapBuf.skip(toFill.put(_unwrapBuf));
305                 return true;
306             }
307 
308             // If we are here, we have a buffer ready into which we can put some read data.
309 
310             // If we have no data to flush, flush the empty buffer
311             if (toFlush==null)
312                 toFlush=__ZERO_BUFFER;
313 
314             // While we are making progress processing SSL engine
315             boolean progress=true;
316             while (progress)
317             {
318                 progress=false;
319 
320                 // Do any real IO
321                 int filled=0,flushed=0;
322                 try
323                 {
324                     // Read any available data
325                     if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0)
326                         progress = true;
327 
328                     // flush any output data
329                     if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0)
330                         progress = true;
331                 }
332                 catch (IOException e)
333                 {
334                     _endp.close();
335                     throw e;
336                 }
337                 finally
338                 {
339                     _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length());
340                 }
341 
342                 // handle the current hand share status
343                 switch(_engine.getHandshakeStatus())
344                 {
345                     case FINISHED:
346                         throw new IllegalStateException();
347 
348                     case NOT_HANDSHAKING:
349                     {
350                         // Try unwrapping some application data
351                         if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill))
352                             progress=true;
353 
354                         // Try wrapping some application data
355                         if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush))
356                             progress=true;
357                     }
358                     break;
359 
360                     case NEED_TASK:
361                     {
362                         // A task needs to be run, so run it!
363                         Runnable task;
364                         while ((task=_engine.getDelegatedTask())!=null)
365                         {
366                             progress=true;
367                             task.run();
368                         }
369 
370                     }
371                     break;
372 
373                     case NEED_WRAP:
374                     {
375                         // The SSL needs to send some handshake data to the other side
376                         if (_handshook && !_allowRenegotiate)
377                             _endp.close();
378                         else if (wrap(toFlush))
379                             progress=true;
380                     }
381                     break;
382 
383                     case NEED_UNWRAP:
384                     {
385                         // The SSL needs to receive some handshake data from the other side
386                         if (_handshook && !_allowRenegotiate)
387                             _endp.close();
388                         else if (!_inbound.hasContent()&&filled==-1)
389                         {
390                             // No more input coming
391                             _endp.shutdownInput();
392                         }
393                         else if (unwrap(toFill))
394                             progress=true;
395                     }
396                     break;
397                 }
398 
399                 // pass on ishut/oshut state
400                 if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasContent())
401                     _engine.closeInbound();
402 
403                 if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasContent())
404                     _endp.shutdownOutput();
405 
406                 // remember if any progress has been made
407                 some_progress|=progress;
408             }
409 
410             // If we are reading into the temp buffer and it has some content, then we should be dispatched.
411             if (toFill==_unwrapBuf && _unwrapBuf.hasContent())
412                 _aEndp.asyncDispatch();
413         }
414         finally
415         {
416             releaseBuffers();
417             if (some_progress)
418                 _progressed.set(true);
419         }
420         return some_progress;
421     }
422 
423     private synchronized boolean wrap(final Buffer buffer) throws IOException
424     {
425         ByteBuffer bbuf=extractByteBuffer(buffer);
426         final SSLEngineResult result;
427 
428         synchronized(bbuf)
429         {
430             _outbound.compact();
431             ByteBuffer out_buffer=_outbound.getByteBuffer();
432             synchronized(out_buffer)
433             {
434                 try
435                 {
436                     bbuf.position(buffer.getIndex());
437                     bbuf.limit(buffer.putIndex());
438                     out_buffer.position(_outbound.putIndex());
439                     out_buffer.limit(out_buffer.capacity());
440                     result=_engine.wrap(bbuf,out_buffer);
441                     if (_logger.isDebugEnabled())
442                         _logger.debug("{} wrap {} {} consumed={} produced={}",
443                             _session,
444                             result.getStatus(),
445                             result.getHandshakeStatus(),
446                             result.bytesConsumed(),
447                             result.bytesProduced());
448 
449 
450                     buffer.skip(result.bytesConsumed());
451                     _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced());
452                 }
453                 catch(SSLException e)
454                 {
455                     _logger.debug(String.valueOf(_endp), e);
456                     _endp.close();
457                     throw e;
458                 }
459                 finally
460                 {
461                     out_buffer.position(0);
462                     out_buffer.limit(out_buffer.capacity());
463                     bbuf.position(0);
464                     bbuf.limit(bbuf.capacity());
465                 }
466             }
467         }
468 
469         switch(result.getStatus())
470         {
471             case BUFFER_UNDERFLOW:
472                 throw new IllegalStateException();
473 
474             case BUFFER_OVERFLOW:
475                 break;
476 
477             case OK:
478                 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
479                     _handshook=true;
480                 break;
481 
482             case CLOSED:
483                 _logger.debug("wrap CLOSE {} {}",this,result);
484                 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
485                     _endp.close();
486                 break;
487 
488             default:
489                 _logger.debug("{} wrap default {}",_session,result);
490             throw new IOException(result.toString());
491         }
492 
493         return result.bytesConsumed()>0 || result.bytesProduced()>0;
494     }
495 
496     private synchronized boolean unwrap(final Buffer buffer) throws IOException
497     {
498         if (!_inbound.hasContent())
499             return false;
500 
501         ByteBuffer bbuf=extractByteBuffer(buffer);
502         final SSLEngineResult result;
503 
504         synchronized(bbuf)
505         {
506             ByteBuffer in_buffer=_inbound.getByteBuffer();
507             synchronized(in_buffer)
508             {
509                 try
510                 {
511                     bbuf.position(buffer.putIndex());
512                     bbuf.limit(buffer.capacity());
513                     in_buffer.position(_inbound.getIndex());
514                     in_buffer.limit(_inbound.putIndex());
515 
516                     result=_engine.unwrap(in_buffer,bbuf);
517                     if (_logger.isDebugEnabled())
518                         _logger.debug("{} unwrap {} {} consumed={} produced={}",
519                             _session,
520                             result.getStatus(),
521                             result.getHandshakeStatus(),
522                             result.bytesConsumed(),
523                             result.bytesProduced());
524 
525                     _inbound.skip(result.bytesConsumed());
526                     _inbound.compact();
527                     buffer.setPutIndex(buffer.putIndex()+result.bytesProduced());
528                 }
529                 catch(SSLException e)
530                 {
531                     _logger.debug(String.valueOf(_endp), e);
532                     _endp.close();
533                     throw e;
534                 }
535                 finally
536                 {
537                     in_buffer.position(0);
538                     in_buffer.limit(in_buffer.capacity());
539                     bbuf.position(0);
540                     bbuf.limit(bbuf.capacity());
541                 }
542             }
543         }
544 
545         switch(result.getStatus())
546         {
547             case BUFFER_UNDERFLOW:
548                 if (_endp.isInputShutdown())
549                     _inbound.clear();
550                 break;
551 
552             case BUFFER_OVERFLOW:
553                 _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString());
554                 break;
555 
556             case OK:
557                 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
558                     _handshook=true;
559                 break;
560 
561             case CLOSED:
562                 _logger.debug("unwrap CLOSE {} {}",this,result);
563                 if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
564                     _endp.close();
565                 break;
566 
567             default:
568                 _logger.debug("{} wrap default {}",_session,result);
569             throw new IOException(result.toString());
570         }
571 
572         //if (LOG.isDebugEnabled() && result.bytesProduced()>0)
573         //    LOG.debug("{} unwrapped '{}'",_session,buffer);
574 
575         return result.bytesConsumed()>0 || result.bytesProduced()>0;
576     }
577 
578 
579     /* ------------------------------------------------------------ */
580     private ByteBuffer extractByteBuffer(Buffer buffer)
581     {
582         if (buffer.buffer() instanceof NIOBuffer)
583             return ((NIOBuffer)buffer.buffer()).getByteBuffer();
584         return ByteBuffer.wrap(buffer.array());
585     }
586 
587     /* ------------------------------------------------------------ */
588     public AsyncEndPoint getSslEndPoint()
589     {
590         return _sslEndPoint;
591     }
592 
593     /* ------------------------------------------------------------ */
594     public String toString()
595     {
596         return String.format("%s %s", super.toString(), _sslEndPoint);
597     }
598 
599     /* ------------------------------------------------------------ */
600     /* ------------------------------------------------------------ */
601     public class SslEndPoint implements AsyncEndPoint
602     {
603         public SSLEngine getSslEngine()
604         {
605             return _engine;
606         }
607 
608         public AsyncEndPoint getEndpoint()
609         {
610             return _aEndp;
611         }
612 
613         public void shutdownOutput() throws IOException
614         {
615             synchronized (SslConnection.this)
616             {
617                 _logger.debug("{} ssl endp.oshut {}",_session,this);
618                 _engine.closeOutbound();
619                 _oshut=true;
620             }
621             flush();
622         }
623 
624         public boolean isOutputShutdown()
625         {
626             synchronized (SslConnection.this)
627             {
628                 return _oshut||!isOpen()||_engine.isOutboundDone();
629             }
630         }
631 
632         public void shutdownInput() throws IOException
633         {
634             _logger.debug("{} ssl endp.ishut!",_session);
635             // We do not do a closeInput here, as SSL does not support half close.
636             // isInputShutdown works it out itself from buffer state and underlying endpoint state.
637         }
638 
639         public boolean isInputShutdown()
640         {
641             synchronized (SslConnection.this)
642             {
643                 return _endp.isInputShutdown() &&
644                 !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) &&
645                 !(_inbound!=null&&_inbound.hasContent());
646             }
647         }
648 
649         public void close() throws IOException
650         {
651             _logger.debug("{} ssl endp.close",_session);
652             _endp.close();
653         }
654 
655         public int fill(Buffer buffer) throws IOException
656         {
657             int size=buffer.length();
658             process(buffer, null);
659 
660             int filled=buffer.length()-size;
661 
662             if (filled==0 && isInputShutdown())
663                 return -1;
664             return filled;
665         }
666 
667         public int flush(Buffer buffer) throws IOException
668         {
669             int size = buffer.length();
670             process(null, buffer);
671             return size-buffer.length();
672         }
673 
674         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
675         {
676             if (header!=null && header.hasContent())
677                 return flush(header);
678             if (buffer!=null && buffer.hasContent())
679                 return flush(buffer);
680             if (trailer!=null && trailer.hasContent())
681                 return flush(trailer);
682             return 0;
683         }
684 
685         public boolean blockReadable(long millisecs) throws IOException
686         {
687             long now = System.currentTimeMillis();
688             long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;
689 
690             while (now<end)
691             {
692                 if (process(null,null))
693                     break;
694                 _endp.blockReadable(end-now);
695                 now = System.currentTimeMillis();
696             }
697 
698             return now<end;
699         }
700 
701         public boolean blockWritable(long millisecs) throws IOException
702         {
703             return _endp.blockWritable(millisecs);
704         }
705 
706         public boolean isOpen()
707         {
708             return _endp.isOpen();
709         }
710 
711         public Object getTransport()
712         {
713             return _endp;
714         }
715 
716         public void flush() throws IOException
717         {
718             process(null, null);
719         }
720 
721         public void asyncDispatch()
722         {
723             _aEndp.asyncDispatch();
724         }
725 
726         public void scheduleWrite()
727         {
728             _aEndp.scheduleWrite();
729         }
730 
731         public void onIdleExpired(long idleForMs)
732         {
733             _aEndp.onIdleExpired(idleForMs);
734         }
735 
736         public void setCheckForIdle(boolean check)
737         {
738             _aEndp.setCheckForIdle(check);
739         }
740 
741         public boolean isCheckForIdle()
742         {
743             return _aEndp.isCheckForIdle();
744         }
745 
746         public void scheduleTimeout(Task task, long timeoutMs)
747         {
748             _aEndp.scheduleTimeout(task,timeoutMs);
749         }
750 
751         public void cancelTimeout(Task task)
752         {
753             _aEndp.cancelTimeout(task);
754         }
755 
756         public boolean isWritable()
757         {
758             return _aEndp.isWritable();
759         }
760 
761         public boolean hasProgressed()
762         {
763             return _progressed.getAndSet(false);
764         }
765 
766         public String getLocalAddr()
767         {
768             return _aEndp.getLocalAddr();
769         }
770 
771         public String getLocalHost()
772         {
773             return _aEndp.getLocalHost();
774         }
775 
776         public int getLocalPort()
777         {
778             return _aEndp.getLocalPort();
779         }
780 
781         public String getRemoteAddr()
782         {
783             return _aEndp.getRemoteAddr();
784         }
785 
786         public String getRemoteHost()
787         {
788             return _aEndp.getRemoteHost();
789         }
790 
791         public int getRemotePort()
792         {
793             return _aEndp.getRemotePort();
794         }
795 
796         public boolean isBlocking()
797         {
798             return false;
799         }
800 
801         public int getMaxIdleTime()
802         {
803             return _aEndp.getMaxIdleTime();
804         }
805 
806         public void setMaxIdleTime(int timeMs) throws IOException
807         {
808             _aEndp.setMaxIdleTime(timeMs);
809         }
810 
811         public Connection getConnection()
812         {
813             return _connection;
814         }
815 
816         public void setConnection(Connection connection)
817         {
818             _connection=(AsyncConnection)connection;
819         }
820 
821         public String toString()
822         {
823             // Do NOT use synchronized (SslConnection.this)
824             // because it's very easy to deadlock when debugging is enabled.
825             // We do a best effort to print the right toString() and that's it.
826             Buffer inbound = _inbound;
827             Buffer outbound = _outbound;
828             Buffer unwrap = _unwrapBuf;
829             int i = inbound == null? -1 : inbound.length();
830             int o = outbound == null ? -1 : outbound.length();
831             int u = unwrap == null ? -1 : unwrap.length();
832             return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}",
833                     _engine.getHandshakeStatus(),
834                     i, o, u,
835                     _ishut, _oshut,
836                     _connection);
837         }
838 
839     }
840 }