1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.spdy.http;
16
17 import java.io.EOFException;
18 import java.io.IOException;
19 import java.io.InterruptedIOException;
20 import java.nio.ByteBuffer;
21 import java.util.LinkedList;
22 import java.util.Queue;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29
30 import org.eclipse.jetty.http.HttpException;
31 import org.eclipse.jetty.http.HttpFields;
32 import org.eclipse.jetty.http.HttpGenerator;
33 import org.eclipse.jetty.http.HttpParser;
34 import org.eclipse.jetty.http.HttpStatus;
35 import org.eclipse.jetty.io.AsyncEndPoint;
36 import org.eclipse.jetty.io.Buffer;
37 import org.eclipse.jetty.io.Buffers;
38 import org.eclipse.jetty.io.ByteArrayBuffer;
39 import org.eclipse.jetty.io.Connection;
40 import org.eclipse.jetty.io.EndPoint;
41 import org.eclipse.jetty.io.nio.AsyncConnection;
42 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
43 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
44 import org.eclipse.jetty.io.nio.NIOBuffer;
45 import org.eclipse.jetty.server.AbstractHttpConnection;
46 import org.eclipse.jetty.server.Connector;
47 import org.eclipse.jetty.server.Server;
48 import org.eclipse.jetty.spdy.SPDYAsyncConnection;
49 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
50 import org.eclipse.jetty.spdy.api.BytesDataInfo;
51 import org.eclipse.jetty.spdy.api.DataInfo;
52 import org.eclipse.jetty.spdy.api.Handler;
53 import org.eclipse.jetty.spdy.api.Headers;
54 import org.eclipse.jetty.spdy.api.ReplyInfo;
55 import org.eclipse.jetty.spdy.api.RstInfo;
56 import org.eclipse.jetty.spdy.api.SPDY;
57 import org.eclipse.jetty.spdy.api.Stream;
58 import org.eclipse.jetty.spdy.api.StreamStatus;
59 import org.eclipse.jetty.spdy.api.SynInfo;
60 import org.eclipse.jetty.util.log.Log;
61 import org.eclipse.jetty.util.log.Logger;
62
63 public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
64 {
65 private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnection.class);
66 private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
67 private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
68
69 private final Queue<Runnable> tasks = new LinkedList<>();
70 private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
71 private final short version;
72 private final SPDYAsyncConnection connection;
73 private final PushStrategy pushStrategy;
74 private final Stream stream;
75 private Headers headers;
76 private DataInfo dataInfo;
77 private NIOBuffer buffer;
78 private volatile State state = State.INITIAL;
79 private boolean dispatched;
80
81 public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, short version, SPDYAsyncConnection connection, PushStrategy pushStrategy, Stream stream)
82 {
83 super(connector, endPoint, server);
84 this.version = version;
85 this.connection = connection;
86 this.pushStrategy = pushStrategy;
87 this.stream = stream;
88 getParser().setPersistent(true);
89 }
90
91 @Override
92 protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
93 {
94 return new HTTPSPDYParser(requestBuffers, endPoint);
95 }
96
97 @Override
98 protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
99 {
100 return new HTTPSPDYGenerator(responseBuffers, endPoint);
101 }
102
103 @Override
104 public AsyncEndPoint getEndPoint()
105 {
106 return (AsyncEndPoint)super.getEndPoint();
107 }
108
109 private void post(Runnable task)
110 {
111 synchronized (tasks)
112 {
113 logger.debug("Posting task {}", task);
114 tasks.offer(task);
115 dispatch();
116 }
117 }
118
119 private void dispatch()
120 {
121 synchronized (tasks)
122 {
123 if (dispatched)
124 return;
125
126 final Runnable task = tasks.poll();
127 if (task != null)
128 {
129 dispatched = true;
130 logger.debug("Dispatching task {}", task);
131 execute(new Runnable()
132 {
133 @Override
134 public void run()
135 {
136 logger.debug("Executing task {}", task);
137 task.run();
138 logger.debug("Completing task {}", task);
139 dispatched = false;
140 dispatch();
141 }
142 });
143 }
144 }
145 }
146
147 protected void execute(Runnable task)
148 {
149 getServer().getThreadPool().dispatch(task);
150 }
151
152 @Override
153 public Connection handle()
154 {
155 setCurrentConnection(this);
156 try
157 {
158 switch (state)
159 {
160 case INITIAL:
161 {
162 break;
163 }
164 case REQUEST:
165 {
166 Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(version));
167 Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
168 Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(this.version));
169
170 if (method == null || uri == null || version == null)
171 throw new HttpException(HttpStatus.BAD_REQUEST_400);
172
173 String m = method.value();
174 String u = uri.value();
175 String v = version.value();
176 logger.debug("HTTP > {} {} {}", m, u, v);
177 startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
178
179 Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(this.version));
180 if(schemeHeader != null)
181 _request.setScheme(schemeHeader.value());
182
183 updateState(State.HEADERS);
184 handle();
185 break;
186 }
187 case HEADERS:
188 {
189 for (Headers.Header header : headers)
190 {
191 String name = header.name();
192
193
194 HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(version, name);
195 if (specialHeader != null)
196 {
197 if (specialHeader == HTTPSPDYHeader.HOST)
198 name = "host";
199 else
200 continue;
201 }
202
203 switch (name)
204 {
205 case "connection":
206 case "keep-alive":
207 case "proxy-connection":
208 case "transfer-encoding":
209 {
210
211 continue;
212 }
213 default:
214 {
215
216 String value = header.value();
217 logger.debug("HTTP > {}: {}", name, value);
218 parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
219 break;
220 }
221 }
222 }
223 break;
224 }
225 case HEADERS_COMPLETE:
226 {
227 headerComplete();
228 break;
229 }
230 case CONTENT:
231 {
232 final Buffer buffer = this.buffer;
233 if (buffer != null && buffer.length() > 0)
234 content(buffer);
235 break;
236 }
237 case FINAL:
238 {
239 messageComplete(0);
240 break;
241 }
242 case ASYNC:
243 {
244 handleRequest();
245 break;
246 }
247 default:
248 {
249 throw new IllegalStateException();
250 }
251 }
252 return this;
253 }
254 catch (HttpException x)
255 {
256 respond(stream, x.getStatus());
257 return this;
258 }
259 catch (IOException x)
260 {
261 close(stream);
262 return this;
263 }
264 finally
265 {
266 setCurrentConnection(null);
267 }
268 }
269
270 private void respond(Stream stream, int status)
271 {
272 if (stream.isUnidirectional())
273 {
274 stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.INTERNAL_ERROR));
275 }
276 else
277 {
278 Headers headers = new Headers();
279 headers.put(HTTPSPDYHeader.STATUS.name(version), String.valueOf(status));
280 headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
281 stream.reply(new ReplyInfo(headers, true));
282 }
283 }
284
285 private void close(Stream stream)
286 {
287 stream.getSession().goAway();
288 }
289
290 @Override
291 public void onInputShutdown() throws IOException
292 {
293 }
294
295 private void updateState(State newState)
296 {
297 logger.debug("State update {} -> {}", state, newState);
298 state = newState;
299 }
300
301 public void beginRequest(final Headers headers, final boolean endRequest)
302 {
303 this.headers = headers.isEmpty() ? null : headers;
304 post(new Runnable()
305 {
306 @Override
307 public void run()
308 {
309 if (!headers.isEmpty())
310 updateState(State.REQUEST);
311 handle();
312 if (endRequest)
313 performEndRequest();
314 }
315 });
316 }
317
318 public void headers(Headers headers)
319 {
320 this.headers = headers;
321 post(new Runnable()
322 {
323 @Override
324 public void run()
325 {
326 updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
327 handle();
328 }
329 });
330 }
331
332 public void content(final DataInfo dataInfo, boolean endRequest)
333 {
334
335
336
337 ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
338 {
339 @Override
340 public void consume(int delta)
341 {
342 super.consume(delta);
343 dataInfo.consume(delta);
344 }
345 };
346 logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
347 dataInfos.offer(copyDataInfo);
348 if (endRequest)
349 dataInfos.offer(END_OF_CONTENT);
350 post(new Runnable()
351 {
352 @Override
353 public void run()
354 {
355 logger.debug("HTTP > {} bytes of content", dataInfo.length());
356 if (state == State.HEADERS)
357 {
358 updateState(State.HEADERS_COMPLETE);
359 handle();
360 }
361 updateState(State.CONTENT);
362 handle();
363 }
364 });
365 }
366
367 public void endRequest()
368 {
369 post(new Runnable()
370 {
371 public void run()
372 {
373 performEndRequest();
374 }
375 });
376 }
377
378 private void performEndRequest()
379 {
380 if (state == State.HEADERS)
381 {
382 updateState(State.HEADERS_COMPLETE);
383 handle();
384 }
385 updateState(State.FINAL);
386 handle();
387 }
388
389 public void async()
390 {
391 post(new Runnable()
392 {
393 @Override
394 public void run()
395 {
396 State oldState = state;
397 updateState(State.ASYNC);
398 handle();
399 updateState(oldState);
400 }
401 });
402 }
403
404 protected void reply(Stream stream, ReplyInfo replyInfo)
405 {
406 if (!stream.isUnidirectional())
407 stream.reply(replyInfo);
408 if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") &&
409 !stream.isClosed())
410 {
411
412
413 Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(version));
414 Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(version));
415 Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
416 Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
417
418 for (String pushResourcePath : pushResources)
419 {
420 final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
421 final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
422
423 stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
424 {
425 @Override
426 public void completed(Stream pushStream)
427 {
428 ServerHTTPSPDYAsyncConnection pushConnection =
429 new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), version, connection, pushStrategy, pushStream);
430 pushConnection.beginRequest(requestHeaders, true);
431 }
432 });
433 }
434 }
435 }
436
437 private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
438 {
439 final Headers requestHeaders = new Headers();
440 requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
441 requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
442 requestHeaders.put(scheme);
443 requestHeaders.put(host);
444 requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
445 String referrer = scheme.value() + "://" + host.value() + uri.value();
446 requestHeaders.put("referer", referrer);
447
448 requestHeaders.put(headers.get("accept-encoding"));
449 requestHeaders.put("x-spdy-push", "true");
450 return requestHeaders;
451 }
452
453 private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
454 {
455 final Headers pushHeaders = new Headers();
456 if (version == SPDY.V2)
457 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
458 else
459 {
460 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
461 pushHeaders.put(scheme);
462 pushHeaders.put(host);
463 }
464 pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
465 pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
466 return pushHeaders;
467 }
468
469 private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
470 {
471 while (true)
472 {
473
474 State state = this.state;
475 if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
476 throw new IllegalStateException();
477
478 if (buffer != null)
479 {
480 if (buffer.length() > 0)
481 {
482 logger.debug("Consuming content bytes, {} available", buffer.length());
483 return buffer;
484 }
485 else
486 {
487
488 dataInfo.consume(dataInfo.length());
489 logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
490 dataInfo = null;
491 buffer = null;
492
493 }
494 }
495 else
496 {
497 logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
498 long begin = System.nanoTime();
499 dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
500 long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
501 logger.debug("Waited {} ms for content bytes", elapsed);
502 if (dataInfo != null)
503 {
504 if (dataInfo == END_OF_CONTENT)
505 {
506 logger.debug("End of content bytes, queue size {}", dataInfos.size());
507 return null;
508 }
509
510 ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
511 buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
512
513 }
514 else
515 {
516 stream.getSession().goAway();
517 throw new EOFException("read timeout");
518 }
519 }
520 }
521 }
522
523 private int availableContent()
524 {
525
526 State state = this.state;
527 if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
528 throw new IllegalStateException();
529 return buffer == null ? 0 : buffer.length();
530 }
531
532 @Override
533 public void commitResponse(boolean last) throws IOException
534 {
535
536 super.commitResponse(last);
537 }
538
539 @Override
540 public void flushResponse() throws IOException
541 {
542
543 commitResponse(false);
544 }
545
546 @Override
547 public void completeResponse() throws IOException
548 {
549
550 super.completeResponse();
551 }
552
553 private enum State
554 {
555 INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
556 }
557
558
559
560
561 private class HTTPSPDYParser extends HttpParser
562 {
563 public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
564 {
565 super(buffers, endPoint, new HTTPSPDYParserHandler());
566 }
567
568 @Override
569 public Buffer blockForContent(long maxIdleTime) throws IOException
570 {
571 try
572 {
573 return consumeContent(maxIdleTime);
574 }
575 catch (InterruptedException x)
576 {
577 throw new InterruptedIOException();
578 }
579 }
580
581 @Override
582 public int available() throws IOException
583 {
584 return availableContent();
585 }
586 }
587
588
589
590
591 private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
592 {
593 @Override
594 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
595 {
596 }
597
598 @Override
599 public void content(Buffer ref) throws IOException
600 {
601 }
602
603 @Override
604 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
605 {
606 }
607 }
608
609
610
611
612
613 private class HTTPSPDYGenerator extends HttpGenerator
614 {
615 private boolean closed;
616
617 private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
618 {
619 super(buffers, endPoint);
620 }
621
622 @Override
623 public void send1xx(int code) throws IOException
624 {
625
626 throw new UnsupportedOperationException();
627 }
628
629 @Override
630 public void sendResponse(Buffer response) throws IOException
631 {
632
633
634
635 throw new UnsupportedOperationException();
636 }
637
638 @Override
639 public void sendError(int code, String reason, String content, boolean close) throws IOException
640 {
641
642 super.sendError(code, reason, content, close);
643 }
644
645 @Override
646 public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
647 {
648 Headers headers = new Headers();
649 String version = "HTTP/1.1";
650 headers.put(HTTPSPDYHeader.VERSION.name(ServerHTTPSPDYAsyncConnection.this.version), version);
651 StringBuilder status = new StringBuilder().append(_status);
652 if (_reason != null)
653 status.append(" ").append(_reason.toString("UTF-8"));
654 headers.put(HTTPSPDYHeader.STATUS.name(ServerHTTPSPDYAsyncConnection.this.version), status.toString());
655 logger.debug("HTTP < {} {}", version, status);
656
657 if (fields != null)
658 {
659 for (int i = 0; i < fields.size(); ++i)
660 {
661 HttpFields.Field field = fields.getField(i);
662 String name = field.getName().toLowerCase();
663 String value = field.getValue();
664 headers.put(name, value);
665 logger.debug("HTTP < {}: {}", name, value);
666 }
667 }
668
669
670
671 Buffer content = getContentBuffer();
672 reply(stream, new ReplyInfo(headers, content == null));
673 if (content != null)
674 {
675 closed = false;
676
677 _state = HttpGenerator.STATE_CONTENT;
678 }
679 else
680 {
681 closed = true;
682
683 _state = HttpGenerator.STATE_END;
684 }
685 }
686
687 private Buffer getContentBuffer()
688 {
689 if (_buffer != null && _buffer.length() > 0)
690 return _buffer;
691 if (_content != null && _content.length() > 0)
692 return _content;
693 return null;
694 }
695
696 @Override
697 public boolean addContent(byte b) throws IOException
698 {
699
700
701 addContent(new ByteArrayBuffer(new byte[]{b}), false);
702 return false;
703 }
704
705 @Override
706 public void addContent(Buffer content, boolean last) throws IOException
707 {
708
709
710 super.addContent(content, last);
711 }
712
713 @Override
714 public void flush(long maxIdleTime) throws IOException
715 {
716 try
717 {
718 Buffer content = getContentBuffer();
719 while (content != null)
720 {
721 DataInfo dataInfo = toDataInfo(content, closed);
722 logger.debug("HTTP < {} bytes of content", dataInfo.length());
723 stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
724 content.clear();
725 _bypass = false;
726 content = getContentBuffer();
727 }
728 }
729 catch (TimeoutException x)
730 {
731 stream.getSession().goAway();
732 throw new EOFException("write timeout");
733 }
734 catch (InterruptedException x)
735 {
736 throw new InterruptedIOException();
737 }
738 catch (ExecutionException x)
739 {
740 throw new IOException(x.getCause());
741 }
742 }
743
744 private DataInfo toDataInfo(Buffer buffer, boolean close)
745 {
746 if (buffer instanceof ByteArrayBuffer)
747 return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
748
749 if (buffer instanceof NIOBuffer)
750 {
751 ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
752 byteBuffer.limit(buffer.putIndex());
753 byteBuffer.position(buffer.getIndex());
754 return new ByteBufferDataInfo(byteBuffer, close);
755 }
756
757 return new BytesDataInfo(buffer.asArray(), close);
758 }
759
760 @Override
761 public int flushBuffer() throws IOException
762 {
763
764
765
766 throw new UnsupportedOperationException();
767 }
768
769 @Override
770 public void blockForOutput(long maxIdleTime) throws IOException
771 {
772
773
774
775 flush(maxIdleTime);
776 }
777
778 @Override
779 public void complete() throws IOException
780 {
781 Buffer content = getContentBuffer();
782 if (content != null)
783 {
784 closed = true;
785 _state = STATE_END;
786 flush(getMaxIdleTime());
787 }
788 else if (!closed)
789 {
790 closed = true;
791 _state = STATE_END;
792
793 stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
794 }
795 }
796 }
797 }