View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.io.ByteArrayEndPoint;
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.StringUtil;
34  import org.eclipse.jetty.util.ssl.SslContextFactory;
35  import org.eclipse.jetty.util.thread.Scheduler;
36  
37  public class LocalConnector extends AbstractConnector
38  {
39      private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40  
41  
42      public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43      {
44          super(server,executor,scheduler,pool,acceptors,factories);
45          setIdleTimeout(30000);
46      }
47  
48      public LocalConnector(Server server)
49      {
50          this(server, null, null, null, 0, new HttpConnectionFactory());
51      }
52  
53      public LocalConnector(Server server, SslContextFactory sslContextFactory)
54      {
55          this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56      }
57  
58      public LocalConnector(Server server, ConnectionFactory connectionFactory)
59      {
60          this(server, null, null, null, 0, connectionFactory);
61      }
62  
63      public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64      {
65          this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66      }
67  
68      @Override
69      public Object getTransport()
70      {
71          return this;
72      }
73  
74      /** Sends requests and get responses based on thread activity.
75       * Returns all the responses received once the thread activity has
76       * returned to the level it was before the requests.
77       * <p>
78       * This methods waits until the connection is closed or
79       * is idle for 1s before returning the responses.
80       * @param requests the requests
81       * @return the responses
82       * @throws Exception if the requests fail
83       */
84      public String getResponses(String requests) throws Exception
85      {
86          return getResponses(requests, 5, TimeUnit.SECONDS);
87      }
88  
89      /** Sends requests and get responses based on thread activity.
90       * Returns all the responses received once the thread activity has
91       * returned to the level it was before the requests.
92       * <p>
93       * This methods waits until the connection is closed or
94       * an idle period before returning the responses.
95       * @param requests the requests
96       * @param idleFor The time the response stream must be idle for before returning
97       * @param units The units of idleFor
98       * @return the responses
99       * @throws Exception if the requests fail
100      */
101     public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102     {
103         ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StringUtil.__UTF8_CHARSET),idleFor,units);
104         return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET);
105     }
106 
107     /** Sends requests and get's responses based on thread activity.
108      * Returns all the responses received once the thread activity has
109      * returned to the level it was before the requests.
110      * <p>
111      * This methods waits until the connection is closed or
112      * is idle for 1s before returning the responses.
113      * @param requestsBuffer the requests
114      * @return the responses
115      * @throws Exception if the requests fail
116      */
117     public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118     {
119         return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120     }
121 
122     /** Sends requests and get's responses based on thread activity.
123      * Returns all the responses received once the thread activity has
124      * returned to the level it was before the requests.
125      * <p>
126      * This methods waits until the connection is closed or
127      * an idle period before returning the responses.
128      * @param requestsBuffer the requests
129      * @param idleFor The time the response stream must be idle for before returning
130      * @param units The units of idleFor
131      * @return the responses
132      * @throws Exception if the requests fail
133      */
134     public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135     {
136         LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
137         LocalEndPoint endp = executeRequest(requestsBuffer);
138         endp.waitUntilClosedOrIdleFor(idleFor,units);
139         ByteBuffer responses = endp.takeOutput();
140         endp.getConnection().close();
141         LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
142         return responses;
143     }
144 
145     /**
146      * Execute a request and return the EndPoint through which
147      * responses can be received.
148      * @param rawRequest the request
149      * @return the local endpoint
150      */
151     public LocalEndPoint executeRequest(String rawRequest)
152     {
153         return executeRequest(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET));
154     }
155 
156     private LocalEndPoint executeRequest(ByteBuffer rawRequest)
157     {
158         LocalEndPoint endp = new LocalEndPoint();
159         endp.setInput(rawRequest);
160         _connects.add(endp);
161         return endp;
162     }
163 
164     @Override
165     protected void accept(int acceptorID) throws IOException, InterruptedException
166     {
167         LOG.debug("accepting {}", acceptorID);
168         LocalEndPoint endPoint = _connects.take();
169         endPoint.onOpen();
170         onEndPointOpened(endPoint);
171 
172         Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
173         endPoint.setConnection(connection);
174 
175 //        connectionOpened(connection);
176         connection.onOpen();
177     }
178 
179     public class LocalEndPoint extends ByteArrayEndPoint
180     {
181         private final CountDownLatch _closed = new CountDownLatch(1);
182 
183         public LocalEndPoint()
184         {
185             super(getScheduler(), LocalConnector.this.getIdleTimeout());
186             setGrowOutput(true);
187         }
188 
189         public void addInput(String s)
190         {
191             // TODO this is a busy wait
192             while(getIn()==null || BufferUtil.hasContent(getIn()))
193                 Thread.yield();
194             setInput(BufferUtil.toBuffer(s, StringUtil.__UTF8_CHARSET));
195         }
196 
197         @Override
198         public void close()
199         {
200             boolean wasOpen=isOpen();
201             super.close();
202             if (wasOpen)
203             {
204 //                connectionClosed(getConnection());
205                 getConnection().onClose();
206                 onClose();
207             }
208         }
209 
210         @Override
211         public void onClose()
212         {
213             LocalConnector.this.onEndPointClosed(this);
214             super.onClose();
215             _closed.countDown();
216         }
217 
218         @Override
219         public void shutdownOutput()
220         {
221             super.shutdownOutput();
222             close();
223         }
224 
225         public void waitUntilClosed()
226         {
227             while (isOpen())
228             {
229                 try
230                 {
231                     if (!_closed.await(10,TimeUnit.SECONDS))
232                         break;
233                 }
234                 catch(Exception e)
235                 {
236                     LOG.warn(e);
237                 }
238             }
239         }
240 
241         public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
242         {
243             Thread.yield();
244             int size=getOutput().remaining();
245             while (isOpen())
246             {
247                 try
248                 {
249                     if (!_closed.await(idleFor,units))
250                     {
251                         if (size==getOutput().remaining())
252                         {
253                             LOG.debug("idle for {} {}",idleFor,units);
254                             return;
255                         }
256                         size=getOutput().remaining();
257                     }
258                 }
259                 catch(Exception e)
260                 {
261                     LOG.warn(e);
262                 }
263             }
264         }
265     }
266 }