1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.io.ByteArrayEndPoint;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.ssl.SslContextFactory;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37 public class LocalConnector extends AbstractConnector
38 {
39 private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40
41
42 public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43 {
44 super(server,executor,scheduler,pool,acceptors,factories);
45 setIdleTimeout(30000);
46 }
47
48 public LocalConnector(Server server)
49 {
50 this(server, null, null, null, -1, new HttpConnectionFactory());
51 }
52
53 public LocalConnector(Server server, SslContextFactory sslContextFactory)
54 {
55 this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56 }
57
58 public LocalConnector(Server server, ConnectionFactory connectionFactory)
59 {
60 this(server, null, null, null, -1, connectionFactory);
61 }
62
63 public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64 {
65 this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66 }
67
68 @Override
69 public Object getTransport()
70 {
71 return this;
72 }
73
74
75
76
77
78
79
80
81
82
83
84 public String getResponses(String requests) throws Exception
85 {
86 return getResponses(requests, 5, TimeUnit.SECONDS);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102 {
103 ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104 return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105 }
106
107
108
109
110
111
112
113
114
115
116
117 public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118 {
119 return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135 {
136 LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
137 LocalEndPoint endp = executeRequest(requestsBuffer);
138 endp.waitUntilClosedOrIdleFor(idleFor,units);
139 ByteBuffer responses = endp.takeOutput();
140 endp.getConnection().close();
141 LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
142 return responses;
143 }
144
145
146
147
148
149
150
151 public LocalEndPoint executeRequest(String rawRequest)
152 {
153 return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
154 }
155
156 private LocalEndPoint executeRequest(ByteBuffer rawRequest)
157 {
158 LocalEndPoint endp = new LocalEndPoint();
159 endp.setInput(rawRequest);
160 _connects.add(endp);
161 return endp;
162 }
163
164 @Override
165 protected void accept(int acceptorID) throws IOException, InterruptedException
166 {
167 LOG.debug("accepting {}", acceptorID);
168 LocalEndPoint endPoint = _connects.take();
169 endPoint.onOpen();
170 onEndPointOpened(endPoint);
171
172 Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
173 endPoint.setConnection(connection);
174
175 connection.onOpen();
176 }
177
178 public class LocalEndPoint extends ByteArrayEndPoint
179 {
180 private final CountDownLatch _closed = new CountDownLatch(1);
181
182 public LocalEndPoint()
183 {
184 super(getScheduler(), LocalConnector.this.getIdleTimeout());
185 setGrowOutput(true);
186 }
187
188 public void addInput(String s)
189 {
190
191 while(getIn()==null || BufferUtil.hasContent(getIn()))
192 Thread.yield();
193 setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
194 }
195
196 @Override
197 public void close()
198 {
199 boolean wasOpen=isOpen();
200 super.close();
201 if (wasOpen)
202 {
203
204 getConnection().onClose();
205 onClose();
206 }
207 }
208
209 @Override
210 public void onClose()
211 {
212 LocalConnector.this.onEndPointClosed(this);
213 super.onClose();
214 _closed.countDown();
215 }
216
217 @Override
218 public void shutdownOutput()
219 {
220 super.shutdownOutput();
221 close();
222 }
223
224 public void waitUntilClosed()
225 {
226 while (isOpen())
227 {
228 try
229 {
230 if (!_closed.await(10,TimeUnit.SECONDS))
231 break;
232 }
233 catch(Exception e)
234 {
235 LOG.warn(e);
236 }
237 }
238 }
239
240 public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
241 {
242 Thread.yield();
243 int size=getOutput().remaining();
244 while (isOpen())
245 {
246 try
247 {
248 if (!_closed.await(idleFor,units))
249 {
250 if (size==getOutput().remaining())
251 {
252 LOG.debug("idle for {} {}",idleFor,units);
253 return;
254 }
255 size=getOutput().remaining();
256 }
257 }
258 catch(Exception e)
259 {
260 LOG.warn(e);
261 }
262 }
263 }
264 }
265 }