1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.io.ByteArrayEndPoint;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.ssl.SslContextFactory;
36 import org.eclipse.jetty.util.thread.Scheduler;
37
38 public class LocalConnector extends AbstractConnector
39 {
40 private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
41
42
43 public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
44 {
45 super(server,executor,scheduler,pool,acceptors,factories);
46 setIdleTimeout(30000);
47 }
48
49 public LocalConnector(Server server)
50 {
51 this(server, null, null, null, 0, new HttpConnectionFactory());
52 }
53
54 public LocalConnector(Server server, SslContextFactory sslContextFactory)
55 {
56 this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
57 }
58
59 public LocalConnector(Server server, ConnectionFactory connectionFactory)
60 {
61 this(server, null, null, null, 0, connectionFactory);
62 }
63
64 public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
65 {
66 this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
67 }
68
69 @Override
70 public Object getTransport()
71 {
72 return this;
73 }
74
75
76
77
78
79
80
81
82
83
84
85 public String getResponses(String requests) throws Exception
86 {
87 return getResponses(requests, 5, TimeUnit.SECONDS);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102 public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
103 {
104 ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
105 return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
106 }
107
108
109
110
111
112
113
114
115
116
117
118 public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
119 {
120 return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135 public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
136 {
137 LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
138 LocalEndPoint endp = executeRequest(requestsBuffer);
139 endp.waitUntilClosedOrIdleFor(idleFor,units);
140 ByteBuffer responses = endp.takeOutput();
141 endp.getConnection().close();
142 LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
143 return responses;
144 }
145
146
147
148
149
150
151
152 public LocalEndPoint executeRequest(String rawRequest)
153 {
154 return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
155 }
156
157 private LocalEndPoint executeRequest(ByteBuffer rawRequest)
158 {
159 LocalEndPoint endp = new LocalEndPoint();
160 endp.setInput(rawRequest);
161 _connects.add(endp);
162 return endp;
163 }
164
165 @Override
166 protected void accept(int acceptorID) throws IOException, InterruptedException
167 {
168 LOG.debug("accepting {}", acceptorID);
169 LocalEndPoint endPoint = _connects.take();
170 endPoint.onOpen();
171 onEndPointOpened(endPoint);
172
173 Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
174 endPoint.setConnection(connection);
175
176
177 connection.onOpen();
178 }
179
180 public class LocalEndPoint extends ByteArrayEndPoint
181 {
182 private final CountDownLatch _closed = new CountDownLatch(1);
183
184 public LocalEndPoint()
185 {
186 super(getScheduler(), LocalConnector.this.getIdleTimeout());
187 setGrowOutput(true);
188 }
189
190 public void addInput(String s)
191 {
192
193 while(getIn()==null || BufferUtil.hasContent(getIn()))
194 Thread.yield();
195 setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
196 }
197
198 @Override
199 public void close()
200 {
201 boolean wasOpen=isOpen();
202 super.close();
203 if (wasOpen)
204 {
205
206 getConnection().onClose();
207 onClose();
208 }
209 }
210
211 @Override
212 public void onClose()
213 {
214 LocalConnector.this.onEndPointClosed(this);
215 super.onClose();
216 _closed.countDown();
217 }
218
219 @Override
220 public void shutdownOutput()
221 {
222 super.shutdownOutput();
223 close();
224 }
225
226 public void waitUntilClosed()
227 {
228 while (isOpen())
229 {
230 try
231 {
232 if (!_closed.await(10,TimeUnit.SECONDS))
233 break;
234 }
235 catch(Exception e)
236 {
237 LOG.warn(e);
238 }
239 }
240 }
241
242 public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
243 {
244 Thread.yield();
245 int size=getOutput().remaining();
246 while (isOpen())
247 {
248 try
249 {
250 if (!_closed.await(idleFor,units))
251 {
252 if (size==getOutput().remaining())
253 {
254 LOG.debug("idle for {} {}",idleFor,units);
255 return;
256 }
257 size=getOutput().remaining();
258 }
259 }
260 catch(Exception e)
261 {
262 LOG.warn(e);
263 }
264 }
265 }
266 }
267 }