View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.charset.StandardCharsets;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.eclipse.jetty.io.ByteArrayEndPoint;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.util.BufferUtil;
34  import org.eclipse.jetty.util.ssl.SslContextFactory;
35  import org.eclipse.jetty.util.thread.Scheduler;
36  
37  public class LocalConnector extends AbstractConnector
38  {
39      private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40  
41  
42      public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43      {
44          super(server,executor,scheduler,pool,acceptors,factories);
45          setIdleTimeout(30000);
46      }
47  
48      public LocalConnector(Server server)
49      {
50          this(server, null, null, null, -1, new HttpConnectionFactory());
51      }
52  
53      public LocalConnector(Server server, SslContextFactory sslContextFactory)
54      {
55          this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56      }
57  
58      public LocalConnector(Server server, ConnectionFactory connectionFactory)
59      {
60          this(server, null, null, null, -1, connectionFactory);
61      }
62  
63      public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64      {
65          this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66      }
67  
68      @Override
69      public Object getTransport()
70      {
71          return this;
72      }
73  
74      /** Sends requests and get responses based on thread activity.
75       * Returns all the responses received once the thread activity has
76       * returned to the level it was before the requests.
77       * <p>
78       * This methods waits until the connection is closed or
79       * is idle for 1s before returning the responses.
80       * @param requests the requests
81       * @return the responses
82       * @throws Exception if the requests fail
83       */
84      public String getResponses(String requests) throws Exception
85      {
86          return getResponses(requests, 5, TimeUnit.SECONDS);
87      }
88  
89      /** Sends requests and get responses based on thread activity.
90       * Returns all the responses received once the thread activity has
91       * returned to the level it was before the requests.
92       * <p>
93       * This methods waits until the connection is closed or
94       * an idle period before returning the responses.
95       * @param requests the requests
96       * @param idleFor The time the response stream must be idle for before returning
97       * @param units The units of idleFor
98       * @return the responses
99       * @throws Exception if the requests fail
100      */
101     public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102     {
103         ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104         return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105     }
106 
107     /** Sends requests and get's responses based on thread activity.
108      * Returns all the responses received once the thread activity has
109      * returned to the level it was before the requests.
110      * <p>
111      * This methods waits until the connection is closed or
112      * is idle for 1s before returning the responses.
113      * @param requestsBuffer the requests
114      * @return the responses
115      * @throws Exception if the requests fail
116      */
117     public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118     {
119         return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120     }
121 
122     /** Sends requests and get's responses based on thread activity.
123      * Returns all the responses received once the thread activity has
124      * returned to the level it was before the requests.
125      * <p>
126      * This methods waits until the connection is closed or
127      * an idle period before returning the responses.
128      * @param requestsBuffer the requests
129      * @param idleFor The time the response stream must be idle for before returning
130      * @param units The units of idleFor
131      * @return the responses
132      * @throws Exception if the requests fail
133      */
134     public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135     {
136         if (LOG.isDebugEnabled())
137             LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
138         LocalEndPoint endp = executeRequest(requestsBuffer);
139         endp.waitUntilClosedOrIdleFor(idleFor,units);
140         ByteBuffer responses = endp.takeOutput();
141         if (endp.isOutputShutdown())
142             endp.close();
143         if (LOG.isDebugEnabled())
144             LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
145         return responses;
146     }
147 
148     /**
149      * Execute a request and return the EndPoint through which
150      * responses can be received.
151      * @param rawRequest the request
152      * @return the local endpoint
153      */
154     public LocalEndPoint executeRequest(String rawRequest)
155     {
156         return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
157     }
158 
159     private LocalEndPoint executeRequest(ByteBuffer rawRequest)
160     {
161         if (!isStarted())
162             throw new IllegalStateException("!STARTED");
163         LocalEndPoint endp = new LocalEndPoint();
164         endp.addInput(rawRequest);
165         _connects.add(endp);
166         return endp;
167     }
168 
169     @Override
170     protected void accept(int acceptorID) throws IOException, InterruptedException
171     {
172         if (LOG.isDebugEnabled())
173             LOG.debug("accepting {}", acceptorID);
174         LocalEndPoint endPoint = _connects.take();
175         endPoint.onOpen();
176         onEndPointOpened(endPoint);
177 
178         Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
179         endPoint.setConnection(connection);
180 
181         connection.onOpen();
182     }
183 
184     public class LocalEndPoint extends ByteArrayEndPoint
185     {
186         private final CountDownLatch _closed = new CountDownLatch(1);
187 
188         public LocalEndPoint()
189         {
190             super(LocalConnector.this.getScheduler(), LocalConnector.this.getIdleTimeout());
191             setGrowOutput(true);
192         }
193         
194         protected void execute(Runnable task)
195         {
196             getExecutor().execute(task);
197         }
198 
199         @Override
200         public void close()
201         {
202             boolean wasOpen=isOpen();
203             super.close();
204             if (wasOpen)
205             {
206                 getConnection().onClose();
207                 onClose();
208             }
209         }
210 
211         @Override
212         public void onClose()
213         {
214             LocalConnector.this.onEndPointClosed(this);
215             super.onClose();
216             _closed.countDown();
217         }
218 
219         @Override
220         public void shutdownOutput()
221         {
222             super.shutdownOutput();
223             close();
224         }
225 
226         public void waitUntilClosed()
227         {
228             while (isOpen())
229             {
230                 try
231                 {
232                     if (!_closed.await(10,TimeUnit.SECONDS))
233                         break;
234                 }
235                 catch(Exception e)
236                 {
237                     LOG.warn(e);
238                 }
239             }
240         }
241 
242         public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
243         {
244             Thread.yield();
245             int size=getOutput().remaining();
246             while (isOpen())
247             {
248                 try
249                 {
250                     if (!_closed.await(idleFor,units))
251                     {
252                         if (size==getOutput().remaining())
253                         {
254                             if (LOG.isDebugEnabled())
255                                 LOG.debug("idle for {} {}",idleFor,units);
256                             return;
257                         }
258                         size=getOutput().remaining();
259                     }
260                 }
261                 catch(Exception e)
262                 {
263                     LOG.warn(e);
264                 }
265             }
266         }
267     }
268 }