1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.http;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.nio.ByteBuffer;
26 import java.util.LinkedList;
27 import java.util.Locale;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import org.eclipse.jetty.http.HttpException;
37 import org.eclipse.jetty.http.HttpFields;
38 import org.eclipse.jetty.http.HttpGenerator;
39 import org.eclipse.jetty.http.HttpParser;
40 import org.eclipse.jetty.http.HttpStatus;
41 import org.eclipse.jetty.io.AsyncEndPoint;
42 import org.eclipse.jetty.io.Buffer;
43 import org.eclipse.jetty.io.Buffers;
44 import org.eclipse.jetty.io.ByteArrayBuffer;
45 import org.eclipse.jetty.io.Connection;
46 import org.eclipse.jetty.io.EndPoint;
47 import org.eclipse.jetty.io.nio.AsyncConnection;
48 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
49 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
50 import org.eclipse.jetty.io.nio.NIOBuffer;
51 import org.eclipse.jetty.server.AbstractHttpConnection;
52 import org.eclipse.jetty.server.Connector;
53 import org.eclipse.jetty.server.Server;
54 import org.eclipse.jetty.spdy.SPDYAsyncConnection;
55 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
56 import org.eclipse.jetty.spdy.api.BytesDataInfo;
57 import org.eclipse.jetty.spdy.api.DataInfo;
58 import org.eclipse.jetty.spdy.api.Handler;
59 import org.eclipse.jetty.spdy.api.Headers;
60 import org.eclipse.jetty.spdy.api.ReplyInfo;
61 import org.eclipse.jetty.spdy.api.RstInfo;
62 import org.eclipse.jetty.spdy.api.SPDY;
63 import org.eclipse.jetty.spdy.api.Stream;
64 import org.eclipse.jetty.spdy.api.StreamStatus;
65 import org.eclipse.jetty.spdy.api.SynInfo;
66 import org.eclipse.jetty.util.log.Log;
67 import org.eclipse.jetty.util.log.Logger;
68
69 public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
70 {
71 private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnection.class);
72 private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
73 private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
74
75 private final Queue<Runnable> tasks = new LinkedList<>();
76 private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
77 private final short version;
78 private final SPDYAsyncConnection connection;
79 private final PushStrategy pushStrategy;
80 private final Stream stream;
81 private Headers headers;
82 private DataInfo dataInfo;
83 private NIOBuffer buffer;
84 private volatile State state = State.INITIAL;
85 private boolean dispatched;
86
87 public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, short version, SPDYAsyncConnection connection, PushStrategy pushStrategy, Stream stream)
88 {
89 super(connector, endPoint, server);
90 this.version = version;
91 this.connection = connection;
92 this.pushStrategy = pushStrategy;
93 this.stream = stream;
94 getParser().setPersistent(true);
95 }
96
97 @Override
98 protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
99 {
100 return new HTTPSPDYParser(requestBuffers, endPoint);
101 }
102
103 @Override
104 protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
105 {
106 return new HTTPSPDYGenerator(responseBuffers, endPoint);
107 }
108
109 @Override
110 public AsyncEndPoint getEndPoint()
111 {
112 return (AsyncEndPoint)super.getEndPoint();
113 }
114
115 private void post(Runnable task)
116 {
117 synchronized (tasks)
118 {
119 logger.debug("Posting task {}", task);
120 tasks.offer(task);
121 dispatch();
122 }
123 }
124
125 private void dispatch()
126 {
127 synchronized (tasks)
128 {
129 if (dispatched)
130 return;
131
132 final Runnable task = tasks.poll();
133 if (task != null)
134 {
135 dispatched = true;
136 logger.debug("Dispatching task {}", task);
137 execute(new Runnable()
138 {
139 @Override
140 public void run()
141 {
142 logger.debug("Executing task {}", task);
143 task.run();
144 logger.debug("Completing task {}", task);
145 dispatched = false;
146 dispatch();
147 }
148 });
149 }
150 }
151 }
152
153 protected void execute(Runnable task)
154 {
155 getServer().getThreadPool().dispatch(task);
156 }
157
158 @Override
159 public Connection handle()
160 {
161 setCurrentConnection(this);
162 try
163 {
164 switch (state)
165 {
166 case INITIAL:
167 {
168 break;
169 }
170 case REQUEST:
171 {
172 Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(version));
173 Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
174 Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(this.version));
175
176 if (method == null || uri == null || version == null)
177 throw new HttpException(HttpStatus.BAD_REQUEST_400);
178
179 String m = method.value();
180 String u = uri.value();
181 String v = version.value();
182 logger.debug("HTTP > {} {} {}", m, u, v);
183 startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
184
185 Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(this.version));
186 if(schemeHeader != null)
187 _request.setScheme(schemeHeader.value());
188
189 updateState(State.HEADERS);
190 handle();
191 break;
192 }
193 case HEADERS:
194 {
195 for (Headers.Header header : headers)
196 {
197 String name = header.name();
198
199
200 HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(version, name);
201 if (specialHeader != null)
202 {
203 if (specialHeader == HTTPSPDYHeader.HOST)
204 name = "host";
205 else
206 continue;
207 }
208
209 switch (name)
210 {
211 case "connection":
212 case "keep-alive":
213 case "proxy-connection":
214 case "transfer-encoding":
215 {
216
217 continue;
218 }
219 default:
220 {
221
222 String value = header.value();
223 logger.debug("HTTP > {}: {}", name, value);
224 parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
225 break;
226 }
227 }
228 }
229 break;
230 }
231 case HEADERS_COMPLETE:
232 {
233 headerComplete();
234 break;
235 }
236 case CONTENT:
237 {
238 final Buffer buffer = this.buffer;
239 if (buffer != null && buffer.length() > 0)
240 content(buffer);
241 break;
242 }
243 case FINAL:
244 {
245 messageComplete(0);
246 break;
247 }
248 case ASYNC:
249 {
250 handleRequest();
251 break;
252 }
253 default:
254 {
255 throw new IllegalStateException();
256 }
257 }
258 return this;
259 }
260 catch (HttpException x)
261 {
262 respond(stream, x.getStatus());
263 return this;
264 }
265 catch (IOException x)
266 {
267 close(stream);
268 return this;
269 }
270 finally
271 {
272 setCurrentConnection(null);
273 }
274 }
275
276 private void respond(Stream stream, int status)
277 {
278 if (stream.isUnidirectional())
279 {
280 stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.INTERNAL_ERROR));
281 }
282 else
283 {
284 Headers headers = new Headers();
285 headers.put(HTTPSPDYHeader.STATUS.name(version), String.valueOf(status));
286 headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
287 stream.reply(new ReplyInfo(headers, true));
288 }
289 }
290
291 private void close(Stream stream)
292 {
293 stream.getSession().goAway();
294 }
295
296 @Override
297 public void onInputShutdown() throws IOException
298 {
299 }
300
301 private void updateState(State newState)
302 {
303 logger.debug("State update {} -> {}", state, newState);
304 state = newState;
305 }
306
307 public void beginRequest(final Headers headers, final boolean endRequest)
308 {
309 this.headers = headers.isEmpty() ? null : headers;
310 post(new Runnable()
311 {
312 @Override
313 public void run()
314 {
315 if (!headers.isEmpty())
316 updateState(State.REQUEST);
317 handle();
318 if (endRequest)
319 performEndRequest();
320 }
321 });
322 }
323
324 public void headers(Headers headers)
325 {
326 this.headers = headers;
327 post(new Runnable()
328 {
329 @Override
330 public void run()
331 {
332 updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
333 handle();
334 }
335 });
336 }
337
338 public void content(final DataInfo dataInfo, boolean endRequest)
339 {
340
341
342
343 ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
344 {
345 @Override
346 public void consume(int delta)
347 {
348 super.consume(delta);
349 dataInfo.consume(delta);
350 }
351 };
352 logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
353 dataInfos.offer(copyDataInfo);
354 if (endRequest)
355 dataInfos.offer(END_OF_CONTENT);
356 post(new Runnable()
357 {
358 @Override
359 public void run()
360 {
361 logger.debug("HTTP > {} bytes of content", dataInfo.length());
362 if (state == State.HEADERS)
363 {
364 updateState(State.HEADERS_COMPLETE);
365 handle();
366 }
367 updateState(State.CONTENT);
368 handle();
369 }
370 });
371 }
372
373 public void endRequest()
374 {
375 post(new Runnable()
376 {
377 public void run()
378 {
379 performEndRequest();
380 }
381 });
382 }
383
384 private void performEndRequest()
385 {
386 if (state == State.HEADERS)
387 {
388 updateState(State.HEADERS_COMPLETE);
389 handle();
390 }
391 updateState(State.FINAL);
392 handle();
393 }
394
395 public void async()
396 {
397 post(new Runnable()
398 {
399 @Override
400 public void run()
401 {
402 State oldState = state;
403 updateState(State.ASYNC);
404 handle();
405 updateState(oldState);
406 }
407 });
408 }
409
410 protected void reply(Stream stream, ReplyInfo replyInfo)
411 {
412 if (!stream.isUnidirectional())
413 stream.reply(replyInfo);
414 if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") &&
415 !stream.isClosed())
416 {
417
418
419 Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(version));
420 Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(version));
421 Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
422 Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
423
424 for (String pushResourcePath : pushResources)
425 {
426 final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
427 final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
428
429 stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
430 {
431 @Override
432 public void completed(Stream pushStream)
433 {
434 ServerHTTPSPDYAsyncConnection pushConnection =
435 new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), version, connection, pushStrategy, pushStream);
436 pushConnection.beginRequest(requestHeaders, true);
437 }
438 });
439 }
440 }
441 }
442
443 private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
444 {
445 final Headers requestHeaders = new Headers();
446 requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
447 requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
448 requestHeaders.put(scheme);
449 requestHeaders.put(host);
450 requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
451 String referrer = scheme.value() + "://" + host.value() + uri.value();
452 requestHeaders.put("referer", referrer);
453
454 requestHeaders.put(headers.get("accept-encoding"));
455 requestHeaders.put("x-spdy-push", "true");
456 return requestHeaders;
457 }
458
459 private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
460 {
461 final Headers pushHeaders = new Headers();
462 if (version == SPDY.V2)
463 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
464 else
465 {
466 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
467 pushHeaders.put(scheme);
468 pushHeaders.put(host);
469 }
470 pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
471 pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
472 return pushHeaders;
473 }
474
475 private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
476 {
477 while (true)
478 {
479
480 State state = this.state;
481 if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
482 throw new IllegalStateException();
483
484 if (buffer != null)
485 {
486 if (buffer.length() > 0)
487 {
488 logger.debug("Consuming content bytes, {} available", buffer.length());
489 return buffer;
490 }
491 else
492 {
493
494 dataInfo.consume(dataInfo.length());
495 logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
496 dataInfo = null;
497 buffer = null;
498
499 }
500 }
501 else
502 {
503 logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
504 long begin = System.nanoTime();
505 dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
506 long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
507 logger.debug("Waited {} ms for content bytes", elapsed);
508 if (dataInfo != null)
509 {
510 if (dataInfo == END_OF_CONTENT)
511 {
512 logger.debug("End of content bytes, queue size {}", dataInfos.size());
513 return null;
514 }
515
516 ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
517 buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
518
519 }
520 else
521 {
522 stream.getSession().goAway();
523 throw new EOFException("read timeout");
524 }
525 }
526 }
527 }
528
529 private int availableContent()
530 {
531
532 State state = this.state;
533 if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
534 throw new IllegalStateException();
535 return buffer == null ? 0 : buffer.length();
536 }
537
538 @Override
539 public void commitResponse(boolean last) throws IOException
540 {
541
542 super.commitResponse(last);
543 }
544
545 @Override
546 public void flushResponse() throws IOException
547 {
548
549 commitResponse(false);
550 }
551
552 @Override
553 public void completeResponse() throws IOException
554 {
555
556 super.completeResponse();
557 }
558
559 private enum State
560 {
561 INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
562 }
563
564
565
566
567 private class HTTPSPDYParser extends HttpParser
568 {
569 public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
570 {
571 super(buffers, endPoint, new HTTPSPDYParserHandler());
572 }
573
574 @Override
575 public Buffer blockForContent(long maxIdleTime) throws IOException
576 {
577 try
578 {
579 return consumeContent(maxIdleTime);
580 }
581 catch (InterruptedException x)
582 {
583 throw new InterruptedIOException();
584 }
585 }
586
587 @Override
588 public int available() throws IOException
589 {
590 return availableContent();
591 }
592 }
593
594
595
596
597 private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
598 {
599 @Override
600 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
601 {
602 }
603
604 @Override
605 public void content(Buffer ref) throws IOException
606 {
607 }
608
609 @Override
610 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
611 {
612 }
613 }
614
615
616
617
618
619 private class HTTPSPDYGenerator extends HttpGenerator
620 {
621 private boolean closed;
622
623 private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
624 {
625 super(buffers, endPoint);
626 }
627
628 @Override
629 public void send1xx(int code) throws IOException
630 {
631
632 throw new UnsupportedOperationException();
633 }
634
635 @Override
636 public void sendResponse(Buffer response) throws IOException
637 {
638
639
640
641 throw new UnsupportedOperationException();
642 }
643
644 @Override
645 public void sendError(int code, String reason, String content, boolean close) throws IOException
646 {
647
648 super.sendError(code, reason, content, close);
649 }
650
651 @Override
652 public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
653 {
654 Headers headers = new Headers();
655 String version = "HTTP/1.1";
656 headers.put(HTTPSPDYHeader.VERSION.name(ServerHTTPSPDYAsyncConnection.this.version), version);
657 StringBuilder status = new StringBuilder().append(_status);
658 if (_reason != null)
659 status.append(" ").append(_reason.toString("UTF-8"));
660 headers.put(HTTPSPDYHeader.STATUS.name(ServerHTTPSPDYAsyncConnection.this.version), status.toString());
661 logger.debug("HTTP < {} {}", version, status);
662
663 if (fields != null)
664 {
665 for (int i = 0; i < fields.size(); ++i)
666 {
667 HttpFields.Field field = fields.getField(i);
668 String name = field.getName().toLowerCase(Locale.ENGLISH);
669 String value = field.getValue();
670 headers.put(name, value);
671 logger.debug("HTTP < {}: {}", name, value);
672 }
673 }
674
675
676
677 Buffer content = getContentBuffer();
678 reply(stream, new ReplyInfo(headers, content == null));
679 if (content != null)
680 {
681 closed = false;
682
683 _state = HttpGenerator.STATE_CONTENT;
684 }
685 else
686 {
687 closed = true;
688
689 _state = HttpGenerator.STATE_END;
690 }
691 }
692
693 private Buffer getContentBuffer()
694 {
695 if (_buffer != null && _buffer.length() > 0)
696 return _buffer;
697 if (_content != null && _content.length() > 0)
698 return _content;
699 return null;
700 }
701
702 @Override
703 public void addContent(Buffer content, boolean last) throws IOException
704 {
705
706
707 super.addContent(content, last);
708 }
709
710 @Override
711 public void flush(long maxIdleTime) throws IOException
712 {
713 try
714 {
715 Buffer content = getContentBuffer();
716 while (content != null)
717 {
718 DataInfo dataInfo = toDataInfo(content, closed);
719 logger.debug("HTTP < {} bytes of content", dataInfo.length());
720 stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
721 content.clear();
722 _bypass = false;
723 content = getContentBuffer();
724 }
725 }
726 catch (TimeoutException x)
727 {
728 stream.getSession().goAway();
729 throw new EOFException("write timeout");
730 }
731 catch (InterruptedException x)
732 {
733 throw new InterruptedIOException();
734 }
735 catch (ExecutionException x)
736 {
737 throw new IOException(x.getCause());
738 }
739 }
740
741 private DataInfo toDataInfo(Buffer buffer, boolean close)
742 {
743 if (buffer instanceof ByteArrayBuffer)
744 return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
745
746 if (buffer instanceof NIOBuffer)
747 {
748 ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
749 byteBuffer.limit(buffer.putIndex());
750 byteBuffer.position(buffer.getIndex());
751 return new ByteBufferDataInfo(byteBuffer, close);
752 }
753
754 return new BytesDataInfo(buffer.asArray(), close);
755 }
756
757 @Override
758 public int flushBuffer() throws IOException
759 {
760
761
762
763 throw new UnsupportedOperationException();
764 }
765
766 @Override
767 public void blockForOutput(long maxIdleTime) throws IOException
768 {
769
770
771
772 flush(maxIdleTime);
773 }
774
775 @Override
776 public void complete() throws IOException
777 {
778 Buffer content = getContentBuffer();
779 if (content != null)
780 {
781 closed = true;
782 _state = STATE_END;
783 flush(getMaxIdleTime());
784 }
785 else if (!closed)
786 {
787 closed = true;
788 _state = STATE_END;
789
790 stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
791 }
792 }
793 }
794 }