View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.charset.StandardCharsets;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.eclipse.jetty.io.ByteArrayEndPoint;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.util.BufferUtil;
34  import org.eclipse.jetty.util.ssl.SslContextFactory;
35  import org.eclipse.jetty.util.thread.Scheduler;
36  
37  public class LocalConnector extends AbstractConnector
38  {
39      private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40  
41  
42      public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43      {
44          super(server,executor,scheduler,pool,acceptors,factories);
45          setIdleTimeout(30000);
46      }
47  
48      public LocalConnector(Server server)
49      {
50          this(server, null, null, null, -1, new HttpConnectionFactory());
51      }
52  
53      public LocalConnector(Server server, SslContextFactory sslContextFactory)
54      {
55          this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56      }
57  
58      public LocalConnector(Server server, ConnectionFactory connectionFactory)
59      {
60          this(server, null, null, null, -1, connectionFactory);
61      }
62  
63      public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64      {
65          this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66      }
67  
68      @Override
69      public Object getTransport()
70      {
71          return this;
72      }
73  
74      /** Sends requests and get responses based on thread activity.
75       * Returns all the responses received once the thread activity has
76       * returned to the level it was before the requests.
77       * <p>
78       * This methods waits until the connection is closed or
79       * is idle for 1s before returning the responses.
80       * @param requests the requests
81       * @return the responses
82       * @throws Exception if the requests fail
83       */
84      public String getResponses(String requests) throws Exception
85      {
86          return getResponses(requests, 5, TimeUnit.SECONDS);
87      }
88  
89      /** Sends requests and get responses based on thread activity.
90       * Returns all the responses received once the thread activity has
91       * returned to the level it was before the requests.
92       * <p>
93       * This methods waits until the connection is closed or
94       * an idle period before returning the responses.
95       * @param requests the requests
96       * @param idleFor The time the response stream must be idle for before returning
97       * @param units The units of idleFor
98       * @return the responses
99       * @throws Exception if the requests fail
100      */
101     public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102     {
103         ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104         return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105     }
106 
107     /** Sends requests and get's responses based on thread activity.
108      * Returns all the responses received once the thread activity has
109      * returned to the level it was before the requests.
110      * <p>
111      * This methods waits until the connection is closed or
112      * is idle for 1s before returning the responses.
113      * @param requestsBuffer the requests
114      * @return the responses
115      * @throws Exception if the requests fail
116      */
117     public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118     {
119         return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120     }
121 
122     /** Sends requests and get's responses based on thread activity.
123      * Returns all the responses received once the thread activity has
124      * returned to the level it was before the requests.
125      * <p>
126      * This methods waits until the connection is closed or
127      * an idle period before returning the responses.
128      * @param requestsBuffer the requests
129      * @param idleFor The time the response stream must be idle for before returning
130      * @param units The units of idleFor
131      * @return the responses
132      * @throws Exception if the requests fail
133      */
134     public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135     {
136         LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
137         LocalEndPoint endp = executeRequest(requestsBuffer);
138         endp.waitUntilClosedOrIdleFor(idleFor,units);
139         ByteBuffer responses = endp.takeOutput();
140         endp.getConnection().close();
141         LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
142         return responses;
143     }
144 
145     /**
146      * Execute a request and return the EndPoint through which
147      * responses can be received.
148      * @param rawRequest the request
149      * @return the local endpoint
150      */
151     public LocalEndPoint executeRequest(String rawRequest)
152     {
153         return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
154     }
155 
156     private LocalEndPoint executeRequest(ByteBuffer rawRequest)
157     {
158         LocalEndPoint endp = new LocalEndPoint();
159         endp.setInput(rawRequest);
160         _connects.add(endp);
161         return endp;
162     }
163 
164     @Override
165     protected void accept(int acceptorID) throws IOException, InterruptedException
166     {
167         LOG.debug("accepting {}", acceptorID);
168         LocalEndPoint endPoint = _connects.take();
169         endPoint.onOpen();
170         onEndPointOpened(endPoint);
171 
172         Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
173         endPoint.setConnection(connection);
174 
175         connection.onOpen();
176     }
177 
178     public class LocalEndPoint extends ByteArrayEndPoint
179     {
180         private final CountDownLatch _closed = new CountDownLatch(1);
181 
182         public LocalEndPoint()
183         {
184             super(getScheduler(), LocalConnector.this.getIdleTimeout());
185             setGrowOutput(true);
186         }
187 
188         public void addInput(String s)
189         {
190             // TODO this is a busy wait
191             while(getIn()==null || BufferUtil.hasContent(getIn()))
192                 Thread.yield();
193             setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
194         }
195 
196         @Override
197         public void close()
198         {
199             boolean wasOpen=isOpen();
200             super.close();
201             if (wasOpen)
202             {
203 //                connectionClosed(getConnection());
204                 getConnection().onClose();
205                 onClose();
206             }
207         }
208 
209         @Override
210         public void onClose()
211         {
212             LocalConnector.this.onEndPointClosed(this);
213             super.onClose();
214             _closed.countDown();
215         }
216 
217         @Override
218         public void shutdownOutput()
219         {
220             super.shutdownOutput();
221             close();
222         }
223 
224         public void waitUntilClosed()
225         {
226             while (isOpen())
227             {
228                 try
229                 {
230                     if (!_closed.await(10,TimeUnit.SECONDS))
231                         break;
232                 }
233                 catch(Exception e)
234                 {
235                     LOG.warn(e);
236                 }
237             }
238         }
239 
240         public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
241         {
242             Thread.yield();
243             int size=getOutput().remaining();
244             while (isOpen())
245             {
246                 try
247                 {
248                     if (!_closed.await(idleFor,units))
249                     {
250                         if (size==getOutput().remaining())
251                         {
252                             LOG.debug("idle for {} {}",idleFor,units);
253                             return;
254                         }
255                         size=getOutput().remaining();
256                     }
257                 }
258                 catch(Exception e)
259                 {
260                     LOG.warn(e);
261                 }
262             }
263         }
264     }
265 }