View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.nio.channels.SelectionKey;
18  import java.nio.channels.SocketChannel;
19  
20  import javax.net.ssl.SSLContext;
21  import javax.net.ssl.SSLEngine;
22  import javax.net.ssl.SSLSession;
23  
24  import org.eclipse.jetty.http.HttpBuffers;
25  import org.eclipse.jetty.http.HttpMethods;
26  import org.eclipse.jetty.http.HttpVersions;
27  import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.Buffers;
30  import org.eclipse.jetty.io.Connection;
31  import org.eclipse.jetty.io.ThreadLocalBuffers;
32  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
33  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
34  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35  import org.eclipse.jetty.io.nio.SelectorManager;
36  import org.eclipse.jetty.util.component.AbstractLifeCycle;
37  import org.eclipse.jetty.util.log.Log;
38  
39  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
40  {
41      private final HttpClient _httpClient;
42      private SSLContext _sslContext;
43      private Buffers _sslBuffers;
44      private boolean _blockingConnect;
45  
46      Manager _selectorManager=new Manager();
47  
48      /**
49       * @param httpClient
50       */
51      SelectConnector(HttpClient httpClient)
52      {
53          _httpClient = httpClient;
54      }
55  
56      /* ------------------------------------------------------------ */
57      /** Get the blockingConnect.
58       * @return the blockingConnect
59       */
60      public boolean isBlockingConnect()
61      {
62          return _blockingConnect;
63      }
64  
65      /* ------------------------------------------------------------ */
66      /** Set the blockingConnect.
67       * @param blockingConnect If true, connections are made in blocking mode.
68       */
69      public void setBlockingConnect(boolean blockingConnect)
70      {
71          _blockingConnect = blockingConnect;
72      }
73  
74      /* ------------------------------------------------------------ */
75      protected void doStart() throws Exception
76      {
77          super.doStart();
78  
79          _selectorManager.start();
80          
81          SSLEngine sslEngine=_selectorManager.newSslEngine();
82          SSLSession ssl_session=sslEngine.getSession();
83          
84          ThreadLocalBuffers buffers = new ThreadLocalBuffers()
85          {
86              @Override
87              protected Buffer newBuffer(int size)
88              {
89                  // TODO indirect?
90                  return new DirectNIOBuffer(size);
91              }
92              @Override
93              protected Buffer newHeader(int size)
94              {
95                  // TODO indirect?
96                  return new DirectNIOBuffer(size);
97              }
98              @Override
99              protected boolean isHeader(Buffer buffer)
100             {
101                 return true;
102             }
103         };
104         buffers.setBufferSize(ssl_session.getApplicationBufferSize());
105         buffers.setHeaderSize(ssl_session.getPacketBufferSize());
106         _sslBuffers=buffers;
107         
108         _httpClient._threadPool.dispatch(this);
109     }
110 
111     /* ------------------------------------------------------------ */
112     protected void doStop() throws Exception
113     {
114         _selectorManager.stop();
115     }
116 
117     /* ------------------------------------------------------------ */
118     public void startConnection( HttpDestination destination )
119         throws IOException
120     {
121         SocketChannel channel = SocketChannel.open();
122         Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
123         channel.configureBlocking( false );
124         channel.connect(address.toSocketAddress());
125         channel.socket().setSoTimeout( _httpClient._soTimeout );
126         _selectorManager.register( channel, destination );
127     }
128 
129     /* ------------------------------------------------------------ */
130     public void run()
131     {
132         while (_httpClient.isRunning())
133         {
134             try
135             {
136                 _selectorManager.doSelect(0);
137             }
138             catch (Exception e)
139             {
140                 Log.warn(e.toString());
141                 Log.debug(e);
142                 Thread.yield();
143             }
144         }
145     }
146 
147     /* ------------------------------------------------------------ */
148     class Manager extends SelectorManager
149     {
150         protected SocketChannel acceptChannel(SelectionKey key) throws IOException
151         {
152             throw new IllegalStateException();
153         }
154 
155         public boolean dispatch(Runnable task)
156         {
157             return SelectConnector.this._httpClient._threadPool.dispatch(task);
158         }
159 
160         protected void endPointOpened(SelectChannelEndPoint endpoint)
161         {
162         }
163 
164         protected void endPointClosed(SelectChannelEndPoint endpoint)
165         {
166         }
167 
168         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
169         {
170             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
171         }
172 
173         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
174         {
175             // key should have destination at this point (will be replaced by endpoint after this call)
176             HttpDestination dest=(HttpDestination)key.attachment();
177 
178 
179             SelectChannelEndPoint ep=null;
180 
181             if (dest.isSecure())
182             {
183                 if (dest.isProxied())
184                 {
185                     String connect = HttpMethods.CONNECT+" "+dest.getAddress()+HttpVersions.HTTP_1_0+"\r\n\r\n";
186                     // TODO need to send this over channel unencrypted and setup endpoint to ignore the 200 OK response.
187 
188                     throw new IllegalStateException("Not Implemented");
189                 }
190 
191                 SSLEngine engine=newSslEngine();
192                 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
193             }
194             else
195             {
196                 ep=new SelectChannelEndPoint(channel,selectSet,key);
197             }
198 
199             HttpConnection connection=(HttpConnection)ep.getConnection();
200             connection.setDestination(dest);
201             dest.onNewConnection(connection);
202             return ep;
203         }
204 
205         private synchronized SSLEngine newSslEngine() throws IOException
206         {
207             if (_sslContext==null)
208             {
209                 _sslContext = SelectConnector.this._httpClient.getSSLContext();
210             }
211 
212             SSLEngine sslEngine = _sslContext.createSSLEngine();
213             sslEngine.setUseClientMode(true);
214             sslEngine.beginHandshake();
215 
216             return sslEngine;
217         }
218 
219         /* ------------------------------------------------------------ */
220         /* (non-Javadoc)
221          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
222          */
223         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
224         {
225             if (attachment instanceof HttpDestination)
226                 ((HttpDestination)attachment).onConnectionFailed(ex);
227             else
228                 super.connectionFailed(channel,ex,attachment);
229         }
230     }
231 }