View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.charset.StandardCharsets;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.eclipse.jetty.io.ByteArrayEndPoint;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.util.BufferUtil;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.ssl.SslContextFactory;
36  import org.eclipse.jetty.util.thread.Scheduler;
37  
38  public class LocalConnector extends AbstractConnector
39  {
40      private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
41  
42  
43      public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
44      {
45          super(server,executor,scheduler,pool,acceptors,factories);
46          setIdleTimeout(30000);
47      }
48  
49      public LocalConnector(Server server)
50      {
51          this(server, null, null, null, 0, new HttpConnectionFactory());
52      }
53  
54      public LocalConnector(Server server, SslContextFactory sslContextFactory)
55      {
56          this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
57      }
58  
59      public LocalConnector(Server server, ConnectionFactory connectionFactory)
60      {
61          this(server, null, null, null, 0, connectionFactory);
62      }
63  
64      public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
65      {
66          this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
67      }
68  
69      @Override
70      public Object getTransport()
71      {
72          return this;
73      }
74  
75      /** Sends requests and get responses based on thread activity.
76       * Returns all the responses received once the thread activity has
77       * returned to the level it was before the requests.
78       * <p>
79       * This methods waits until the connection is closed or
80       * is idle for 1s before returning the responses.
81       * @param requests the requests
82       * @return the responses
83       * @throws Exception if the requests fail
84       */
85      public String getResponses(String requests) throws Exception
86      {
87          return getResponses(requests, 5, TimeUnit.SECONDS);
88      }
89  
90      /** Sends requests and get responses based on thread activity.
91       * Returns all the responses received once the thread activity has
92       * returned to the level it was before the requests.
93       * <p>
94       * This methods waits until the connection is closed or
95       * an idle period before returning the responses.
96       * @param requests the requests
97       * @param idleFor The time the response stream must be idle for before returning
98       * @param units The units of idleFor
99       * @return the responses
100      * @throws Exception if the requests fail
101      */
102     public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
103     {
104         ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
105         return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
106     }
107 
108     /** Sends requests and get's responses based on thread activity.
109      * Returns all the responses received once the thread activity has
110      * returned to the level it was before the requests.
111      * <p>
112      * This methods waits until the connection is closed or
113      * is idle for 1s before returning the responses.
114      * @param requestsBuffer the requests
115      * @return the responses
116      * @throws Exception if the requests fail
117      */
118     public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
119     {
120         return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
121     }
122 
123     /** Sends requests and get's responses based on thread activity.
124      * Returns all the responses received once the thread activity has
125      * returned to the level it was before the requests.
126      * <p>
127      * This methods waits until the connection is closed or
128      * an idle period before returning the responses.
129      * @param requestsBuffer the requests
130      * @param idleFor The time the response stream must be idle for before returning
131      * @param units The units of idleFor
132      * @return the responses
133      * @throws Exception if the requests fail
134      */
135     public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
136     {
137         LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
138         LocalEndPoint endp = executeRequest(requestsBuffer);
139         endp.waitUntilClosedOrIdleFor(idleFor,units);
140         ByteBuffer responses = endp.takeOutput();
141         endp.getConnection().close();
142         LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
143         return responses;
144     }
145 
146     /**
147      * Execute a request and return the EndPoint through which
148      * responses can be received.
149      * @param rawRequest the request
150      * @return the local endpoint
151      */
152     public LocalEndPoint executeRequest(String rawRequest)
153     {
154         return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
155     }
156 
157     private LocalEndPoint executeRequest(ByteBuffer rawRequest)
158     {
159         LocalEndPoint endp = new LocalEndPoint();
160         endp.setInput(rawRequest);
161         _connects.add(endp);
162         return endp;
163     }
164 
165     @Override
166     protected void accept(int acceptorID) throws IOException, InterruptedException
167     {
168         LOG.debug("accepting {}", acceptorID);
169         LocalEndPoint endPoint = _connects.take();
170         endPoint.onOpen();
171         onEndPointOpened(endPoint);
172 
173         Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
174         endPoint.setConnection(connection);
175 
176 //        connectionOpened(connection);
177         connection.onOpen();
178     }
179 
180     public class LocalEndPoint extends ByteArrayEndPoint
181     {
182         private final CountDownLatch _closed = new CountDownLatch(1);
183 
184         public LocalEndPoint()
185         {
186             super(getScheduler(), LocalConnector.this.getIdleTimeout());
187             setGrowOutput(true);
188         }
189 
190         public void addInput(String s)
191         {
192             // TODO this is a busy wait
193             while(getIn()==null || BufferUtil.hasContent(getIn()))
194                 Thread.yield();
195             setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
196         }
197 
198         @Override
199         public void close()
200         {
201             boolean wasOpen=isOpen();
202             super.close();
203             if (wasOpen)
204             {
205 //                connectionClosed(getConnection());
206                 getConnection().onClose();
207                 onClose();
208             }
209         }
210 
211         @Override
212         public void onClose()
213         {
214             LocalConnector.this.onEndPointClosed(this);
215             super.onClose();
216             _closed.countDown();
217         }
218 
219         @Override
220         public void shutdownOutput()
221         {
222             super.shutdownOutput();
223             close();
224         }
225 
226         public void waitUntilClosed()
227         {
228             while (isOpen())
229             {
230                 try
231                 {
232                     if (!_closed.await(10,TimeUnit.SECONDS))
233                         break;
234                 }
235                 catch(Exception e)
236                 {
237                     LOG.warn(e);
238                 }
239             }
240         }
241 
242         public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
243         {
244             Thread.yield();
245             int size=getOutput().remaining();
246             while (isOpen())
247             {
248                 try
249                 {
250                     if (!_closed.await(idleFor,units))
251                     {
252                         if (size==getOutput().remaining())
253                         {
254                             LOG.debug("idle for {} {}",idleFor,units);
255                             return;
256                         }
257                         size=getOutput().remaining();
258                     }
259                 }
260                 catch(Exception e)
261                 {
262                     LOG.warn(e);
263                 }
264             }
265         }
266     }
267 }