1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.io.ByteArrayEndPoint;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.ssl.SslContextFactory;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37 public class LocalConnector extends AbstractConnector
38 {
39 private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40
41
42 public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43 {
44 super(server,executor,scheduler,pool,acceptors,factories);
45 setIdleTimeout(30000);
46 }
47
48 public LocalConnector(Server server)
49 {
50 this(server, null, null, null, 0, new HttpConnectionFactory());
51 }
52
53 public LocalConnector(Server server, SslContextFactory sslContextFactory)
54 {
55 this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56 }
57
58 public LocalConnector(Server server, ConnectionFactory connectionFactory)
59 {
60 this(server, null, null, null, 0, connectionFactory);
61 }
62
63 public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64 {
65 this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66 }
67
68 @Override
69 public Object getTransport()
70 {
71 return this;
72 }
73
74
75
76
77
78
79
80
81
82
83
84 public String getResponses(String requests) throws Exception
85 {
86 return getResponses(requests, 5, TimeUnit.SECONDS);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102 {
103 ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StringUtil.__UTF8_CHARSET),idleFor,units);
104 return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET);
105 }
106
107
108
109
110
111
112
113
114
115
116
117 public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118 {
119 return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135 {
136 LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
137 LocalEndPoint endp = executeRequest(requestsBuffer);
138 endp.waitUntilClosedOrIdleFor(idleFor,units);
139 ByteBuffer responses = endp.takeOutput();
140 endp.getConnection().close();
141 LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
142 return responses;
143 }
144
145
146
147
148
149
150
151 public LocalEndPoint executeRequest(String rawRequest)
152 {
153 return executeRequest(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET));
154 }
155
156 private LocalEndPoint executeRequest(ByteBuffer rawRequest)
157 {
158 LocalEndPoint endp = new LocalEndPoint();
159 endp.setInput(rawRequest);
160 _connects.add(endp);
161 return endp;
162 }
163
164 @Override
165 protected void accept(int acceptorID) throws IOException, InterruptedException
166 {
167 LOG.debug("accepting {}", acceptorID);
168 LocalEndPoint endPoint = _connects.take();
169 endPoint.onOpen();
170 onEndPointOpened(endPoint);
171
172 Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
173 endPoint.setConnection(connection);
174
175
176 connection.onOpen();
177 }
178
179 public class LocalEndPoint extends ByteArrayEndPoint
180 {
181 private final CountDownLatch _closed = new CountDownLatch(1);
182
183 public LocalEndPoint()
184 {
185 super(getScheduler(), LocalConnector.this.getIdleTimeout());
186 setGrowOutput(true);
187 }
188
189 public void addInput(String s)
190 {
191
192 while(getIn()==null || BufferUtil.hasContent(getIn()))
193 Thread.yield();
194 setInput(BufferUtil.toBuffer(s, StringUtil.__UTF8_CHARSET));
195 }
196
197 @Override
198 public void close()
199 {
200 boolean wasOpen=isOpen();
201 super.close();
202 if (wasOpen)
203 {
204
205 getConnection().onClose();
206 onClose();
207 }
208 }
209
210 @Override
211 public void onClose()
212 {
213 LocalConnector.this.onEndPointClosed(this);
214 super.onClose();
215 _closed.countDown();
216 }
217
218 @Override
219 public void shutdownOutput()
220 {
221 super.shutdownOutput();
222 close();
223 }
224
225 public void waitUntilClosed()
226 {
227 while (isOpen())
228 {
229 try
230 {
231 if (!_closed.await(10,TimeUnit.SECONDS))
232 break;
233 }
234 catch(Exception e)
235 {
236 LOG.warn(e);
237 }
238 }
239 }
240
241 public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
242 {
243 Thread.yield();
244 int size=getOutput().remaining();
245 while (isOpen())
246 {
247 try
248 {
249 if (!_closed.await(idleFor,units))
250 {
251 if (size==getOutput().remaining())
252 {
253 LOG.debug("idle for {} {}",idleFor,units);
254 return;
255 }
256 size=getOutput().remaining();
257 }
258 }
259 catch(Exception e)
260 {
261 LOG.warn(e);
262 }
263 }
264 }
265 }
266 }