View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.http;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.nio.ByteBuffer;
26  import java.util.LinkedList;
27  import java.util.Locale;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  
36  import org.eclipse.jetty.http.HttpException;
37  import org.eclipse.jetty.http.HttpFields;
38  import org.eclipse.jetty.http.HttpGenerator;
39  import org.eclipse.jetty.http.HttpParser;
40  import org.eclipse.jetty.http.HttpStatus;
41  import org.eclipse.jetty.io.AsyncEndPoint;
42  import org.eclipse.jetty.io.Buffer;
43  import org.eclipse.jetty.io.Buffers;
44  import org.eclipse.jetty.io.ByteArrayBuffer;
45  import org.eclipse.jetty.io.Connection;
46  import org.eclipse.jetty.io.EndPoint;
47  import org.eclipse.jetty.io.nio.AsyncConnection;
48  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
49  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
50  import org.eclipse.jetty.io.nio.NIOBuffer;
51  import org.eclipse.jetty.server.AbstractHttpConnection;
52  import org.eclipse.jetty.server.Connector;
53  import org.eclipse.jetty.server.Server;
54  import org.eclipse.jetty.spdy.SPDYAsyncConnection;
55  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
56  import org.eclipse.jetty.spdy.api.BytesDataInfo;
57  import org.eclipse.jetty.spdy.api.DataInfo;
58  import org.eclipse.jetty.spdy.api.Handler;
59  import org.eclipse.jetty.spdy.api.Headers;
60  import org.eclipse.jetty.spdy.api.ReplyInfo;
61  import org.eclipse.jetty.spdy.api.RstInfo;
62  import org.eclipse.jetty.spdy.api.SPDY;
63  import org.eclipse.jetty.spdy.api.Stream;
64  import org.eclipse.jetty.spdy.api.StreamStatus;
65  import org.eclipse.jetty.spdy.api.SynInfo;
66  import org.eclipse.jetty.util.log.Log;
67  import org.eclipse.jetty.util.log.Logger;
68  
69  public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
70  {
71      private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnection.class);
72      private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
73      private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
74  
75      private final Queue<Runnable> tasks = new LinkedList<>();
76      private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
77      private final short version;
78      private final SPDYAsyncConnection connection;
79      private final PushStrategy pushStrategy;
80      private final Stream stream;
81      private Headers headers; // No need for volatile, guarded by state
82      private DataInfo dataInfo; // No need for volatile, guarded by state
83      private NIOBuffer buffer; // No need for volatile, guarded by state
84      private volatile State state = State.INITIAL;
85      private boolean dispatched; // Guarded by synchronization on tasks
86  
87      public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, short version, SPDYAsyncConnection connection, PushStrategy pushStrategy, Stream stream)
88      {
89          super(connector, endPoint, server);
90          this.version = version;
91          this.connection = connection;
92          this.pushStrategy = pushStrategy;
93          this.stream = stream;
94          getParser().setPersistent(true);
95      }
96  
97      @Override
98      protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
99      {
100         return new HTTPSPDYParser(requestBuffers, endPoint);
101     }
102 
103     @Override
104     protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
105     {
106         return new HTTPSPDYGenerator(responseBuffers, endPoint);
107     }
108 
109     @Override
110     public AsyncEndPoint getEndPoint()
111     {
112         return (AsyncEndPoint)super.getEndPoint();
113     }
114 
115     private void post(Runnable task)
116     {
117         synchronized (tasks)
118         {
119             logger.debug("Posting task {}", task);
120             tasks.offer(task);
121             dispatch();
122         }
123     }
124 
125     private void dispatch()
126     {
127         synchronized (tasks)
128         {
129             if (dispatched)
130                 return;
131 
132             final Runnable task = tasks.poll();
133             if (task != null)
134             {
135                 dispatched = true;
136                 logger.debug("Dispatching task {}", task);
137                 execute(new Runnable()
138                 {
139                     @Override
140                     public void run()
141                     {
142                         logger.debug("Executing task {}", task);
143                         task.run();
144                         logger.debug("Completing task {}", task);
145                         dispatched = false;
146                         dispatch();
147                     }
148                 });
149             }
150         }
151     }
152 
153     protected void execute(Runnable task)
154     {
155         getServer().getThreadPool().dispatch(task);
156     }
157 
158     @Override
159     public Connection handle()
160     {
161         setCurrentConnection(this);
162         try
163         {
164             switch (state)
165             {
166                 case INITIAL:
167                 {
168                     break;
169                 }
170                 case REQUEST:
171                 {
172                     Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(version));
173                     Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
174                     Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(this.version));
175 
176                     if (method == null || uri == null || version == null)
177                         throw new HttpException(HttpStatus.BAD_REQUEST_400);
178 
179                     String m = method.value();
180                     String u = uri.value();
181                     String v = version.value();
182                     logger.debug("HTTP > {} {} {}", m, u, v);
183                     startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
184 
185                     Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(this.version));
186                     if(schemeHeader != null)
187                         _request.setScheme(schemeHeader.value());
188 
189                     updateState(State.HEADERS);
190                     handle();
191                     break;
192                 }
193                 case HEADERS:
194                 {
195                     for (Headers.Header header : headers)
196                     {
197                         String name = header.name();
198 
199                         // Skip special SPDY headers, unless it's the "host" header
200                         HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(version, name);
201                         if (specialHeader != null)
202                         {
203                             if (specialHeader == HTTPSPDYHeader.HOST)
204                                 name = "host";
205                             else
206                                 continue;
207                         }
208 
209                         switch (name)
210                         {
211                             case "connection":
212                             case "keep-alive":
213                             case "proxy-connection":
214                             case "transfer-encoding":
215                             {
216                                 // Spec says to ignore these headers
217                                 continue;
218                             }
219                             default:
220                             {
221                                 // Spec says headers must be single valued
222                                 String value = header.value();
223                                 logger.debug("HTTP > {}: {}", name, value);
224                                 parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
225                                 break;
226                             }
227                         }
228                     }
229                     break;
230                 }
231                 case HEADERS_COMPLETE:
232                 {
233                     headerComplete();
234                     break;
235                 }
236                 case CONTENT:
237                 {
238                     final Buffer buffer = this.buffer;
239                     if (buffer != null && buffer.length() > 0)
240                         content(buffer);
241                     break;
242                 }
243                 case FINAL:
244                 {
245                     messageComplete(0);
246                     break;
247                 }
248                 case ASYNC:
249                 {
250                     handleRequest();
251                     break;
252                 }
253                 default:
254                 {
255                     throw new IllegalStateException();
256                 }
257             }
258             return this;
259         }
260         catch (HttpException x)
261         {
262             respond(stream, x.getStatus());
263             return this;
264         }
265         catch (IOException x)
266         {
267             close(stream);
268             return this;
269         }
270         finally
271         {
272             setCurrentConnection(null);
273         }
274     }
275 
276     private void respond(Stream stream, int status)
277     {
278         if (stream.isUnidirectional())
279         {
280             stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.INTERNAL_ERROR));
281         }
282         else
283         {
284             Headers headers = new Headers();
285             headers.put(HTTPSPDYHeader.STATUS.name(version), String.valueOf(status));
286             headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
287             stream.reply(new ReplyInfo(headers, true));
288         }
289     }
290 
291     private void close(Stream stream)
292     {
293         stream.getSession().goAway();
294     }
295 
296     @Override
297     public void onInputShutdown() throws IOException
298     {
299     }
300 
301     private void updateState(State newState)
302     {
303         logger.debug("State update {} -> {}", state, newState);
304         state = newState;
305     }
306 
307     public void beginRequest(final Headers headers, final boolean endRequest)
308     {
309         this.headers = headers.isEmpty() ? null : headers;
310         post(new Runnable()
311         {
312             @Override
313             public void run()
314             {
315                 if (!headers.isEmpty())
316                     updateState(State.REQUEST);
317                 handle();
318                 if (endRequest)
319                     performEndRequest();
320             }
321         });
322     }
323 
324     public void headers(Headers headers)
325     {
326         this.headers = headers;
327         post(new Runnable()
328         {
329             @Override
330             public void run()
331             {
332                 updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
333                 handle();
334             }
335         });
336     }
337 
338     public void content(final DataInfo dataInfo, boolean endRequest)
339     {
340         // We need to copy the dataInfo since we do not know when its bytes
341         // will be consumed. When the copy is consumed, we consume also the
342         // original, so the implementation can send a window update.
343         ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
344         {
345             @Override
346             public void consume(int delta)
347             {
348                 super.consume(delta);
349                 dataInfo.consume(delta);
350             }
351         };
352         logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
353         dataInfos.offer(copyDataInfo);
354         if (endRequest)
355             dataInfos.offer(END_OF_CONTENT);
356         post(new Runnable()
357         {
358             @Override
359             public void run()
360             {
361                 logger.debug("HTTP > {} bytes of content", dataInfo.length());
362                 if (state == State.HEADERS)
363                 {
364                     updateState(State.HEADERS_COMPLETE);
365                     handle();
366                 }
367                 updateState(State.CONTENT);
368                 handle();
369             }
370         });
371     }
372 
373     public void endRequest()
374     {
375         post(new Runnable()
376         {
377             public void run()
378             {
379                 performEndRequest();
380             }
381         });
382     }
383 
384     private void performEndRequest()
385     {
386         if (state == State.HEADERS)
387         {
388             updateState(State.HEADERS_COMPLETE);
389             handle();
390         }
391         updateState(State.FINAL);
392         handle();
393     }
394 
395     public void async()
396     {
397         post(new Runnable()
398         {
399             @Override
400             public void run()
401             {
402                 State oldState = state;
403                 updateState(State.ASYNC);
404                 handle();
405                 updateState(oldState);
406             }
407         });
408     }
409 
410     protected void reply(Stream stream, ReplyInfo replyInfo)
411     {
412         if (!stream.isUnidirectional())
413             stream.reply(replyInfo);
414         if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") &&
415                 !stream.isClosed())
416         {
417             // We have a 200 OK with some content to send
418 
419             Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(version));
420             Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(version));
421             Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
422             Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
423 
424             for (String pushResourcePath : pushResources)
425             {
426                 final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
427                 final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
428 
429                 stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
430                 {
431                     @Override
432                     public void completed(Stream pushStream)
433                     {
434                         ServerHTTPSPDYAsyncConnection pushConnection =
435                                 new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), version, connection, pushStrategy, pushStream);
436                         pushConnection.beginRequest(requestHeaders, true);
437                     }
438                 });
439             }
440         }
441     }
442 
443     private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
444     {
445         final Headers requestHeaders = new Headers();
446         requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
447         requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
448         requestHeaders.put(scheme);
449         requestHeaders.put(host);
450         requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
451         String referrer = scheme.value() + "://" + host.value() + uri.value();
452         requestHeaders.put("referer", referrer);
453         // Remember support for gzip encoding
454         requestHeaders.put(headers.get("accept-encoding"));
455         requestHeaders.put("x-spdy-push", "true");
456         return requestHeaders;
457     }
458 
459     private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
460     {
461         final Headers pushHeaders = new Headers();
462         if (version == SPDY.V2)
463             pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
464         else
465         {
466             pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
467             pushHeaders.put(scheme);
468             pushHeaders.put(host);
469         }
470         pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
471         pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
472         return pushHeaders;
473     }
474 
475     private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
476     {
477         while (true)
478         {
479             // Volatile read to ensure visibility
480             State state = this.state;
481             if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
482                 throw new IllegalStateException();
483 
484             if (buffer != null)
485             {
486                 if (buffer.length() > 0)
487                 {
488                     logger.debug("Consuming content bytes, {} available", buffer.length());
489                     return buffer;
490                 }
491                 else
492                 {
493                     // The application has consumed the buffer, so consume also the DataInfo
494                     dataInfo.consume(dataInfo.length());
495                     logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
496                     dataInfo = null;
497                     buffer = null;
498                     // Loop to get content bytes from DataInfos
499                 }
500             }
501             else
502             {
503                 logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
504                 long begin = System.nanoTime();
505                 dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
506                 long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
507                 logger.debug("Waited {} ms for content bytes", elapsed);
508                 if (dataInfo != null)
509                 {
510                     if (dataInfo == END_OF_CONTENT)
511                     {
512                         logger.debug("End of content bytes, queue size {}", dataInfos.size());
513                         return null;
514                     }
515 
516                     ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
517                     buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
518                     // Loop to return the buffer
519                 }
520                 else
521                 {
522                     stream.getSession().goAway();
523                     throw new EOFException("read timeout");
524                 }
525             }
526         }
527     }
528 
529     private int availableContent()
530     {
531         // Volatile read to ensure visibility
532         State state = this.state;
533         if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
534             throw new IllegalStateException();
535         return buffer == null ? 0 : buffer.length();
536     }
537 
538     @Override
539     public void commitResponse(boolean last) throws IOException
540     {
541         // Keep the original behavior since it just delegates to the generator
542         super.commitResponse(last);
543     }
544 
545     @Override
546     public void flushResponse() throws IOException
547     {
548         // Just commit the response, if necessary: flushing buffers will be taken care of in complete()
549         commitResponse(false);
550     }
551 
552     @Override
553     public void completeResponse() throws IOException
554     {
555         // Keep the original behavior since it just delegates to the generator
556         super.completeResponse();
557     }
558 
559     private enum State
560     {
561         INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
562     }
563 
564     /**
565      * Needed in order to override parser methods that read content.
566      */
567     private class HTTPSPDYParser extends HttpParser
568     {
569         public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
570         {
571             super(buffers, endPoint, new HTTPSPDYParserHandler());
572         }
573 
574         @Override
575         public Buffer blockForContent(long maxIdleTime) throws IOException
576         {
577             try
578             {
579                 return consumeContent(maxIdleTime);
580             }
581             catch (InterruptedException x)
582             {
583                 throw new InterruptedIOException();
584             }
585         }
586 
587         @Override
588         public int available() throws IOException
589         {
590             return availableContent();
591         }
592     }
593 
594     /**
595      * Empty implementation, since it won't parse anything
596      */
597     private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
598     {
599         @Override
600         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
601         {
602         }
603 
604         @Override
605         public void content(Buffer ref) throws IOException
606         {
607         }
608 
609         @Override
610         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
611         {
612         }
613     }
614 
615     /**
616      * Needed in order to override generator methods that would generate HTTP,
617      * since we must generate SPDY instead.
618      */
619     private class HTTPSPDYGenerator extends HttpGenerator
620     {
621         private boolean closed;
622 
623         private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
624         {
625             super(buffers, endPoint);
626         }
627 
628         @Override
629         public void send1xx(int code) throws IOException
630         {
631             // TODO: not supported yet, but unlikely to be called
632             throw new UnsupportedOperationException();
633         }
634 
635         @Override
636         public void sendResponse(Buffer response) throws IOException
637         {
638             // Do not think this method is ever used.
639             // Jetty calls it from Request.setAttribute() only if the attribute
640             // "org.eclipse.jetty.server.ResponseBuffer", seems like a hack.
641             throw new UnsupportedOperationException();
642         }
643 
644         @Override
645         public void sendError(int code, String reason, String content, boolean close) throws IOException
646         {
647             // Keep original behavior because it's delegating to other methods that we override.
648             super.sendError(code, reason, content, close);
649         }
650 
651         @Override
652         public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
653         {
654             Headers headers = new Headers();
655             String version = "HTTP/1.1";
656             headers.put(HTTPSPDYHeader.VERSION.name(ServerHTTPSPDYAsyncConnection.this.version), version);
657             StringBuilder status = new StringBuilder().append(_status);
658             if (_reason != null)
659                 status.append(" ").append(_reason.toString("UTF-8"));
660             headers.put(HTTPSPDYHeader.STATUS.name(ServerHTTPSPDYAsyncConnection.this.version), status.toString());
661             logger.debug("HTTP < {} {}", version, status);
662 
663             if (fields != null)
664             {
665                 for (int i = 0; i < fields.size(); ++i)
666                 {
667                     HttpFields.Field field = fields.getField(i);
668                     String name = field.getName().toLowerCase(Locale.ENGLISH);
669                     String value = field.getValue();
670                     headers.put(name, value);
671                     logger.debug("HTTP < {}: {}", name, value);
672                 }
673             }
674 
675             // We have to query the HttpGenerator and its buffers to know
676             // whether there is content buffered and update the generator state
677             Buffer content = getContentBuffer();
678             reply(stream, new ReplyInfo(headers, content == null));
679             if (content != null)
680             {
681                 closed = false;
682                 // Update HttpGenerator fields so that they remain consistent
683                 _state = HttpGenerator.STATE_CONTENT;
684             }
685             else
686             {
687                 closed = true;
688                 // Update HttpGenerator fields so that they remain consistent
689                 _state = HttpGenerator.STATE_END;
690             }
691         }
692 
693         private Buffer getContentBuffer()
694         {
695             if (_buffer != null && _buffer.length() > 0)
696                 return _buffer;
697             if (_content != null && _content.length() > 0)
698                 return _content;
699             return null;
700         }
701 
702         @Override
703         public void addContent(Buffer content, boolean last) throws IOException
704         {
705             // Keep the original behavior since adding content will
706             // just accumulate bytes until the response is committed.
707             super.addContent(content, last);
708         }
709 
710         @Override
711         public void flush(long maxIdleTime) throws IOException
712         {
713             try
714             {
715                 Buffer content = getContentBuffer();
716                 while (content != null)
717                 {
718                     DataInfo dataInfo = toDataInfo(content, closed);
719                     logger.debug("HTTP < {} bytes of content", dataInfo.length());
720                     stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
721                     content.clear();
722                     _bypass = false;
723                     content = getContentBuffer();
724                 }
725             }
726             catch (TimeoutException x)
727             {
728                 stream.getSession().goAway();
729                 throw new EOFException("write timeout");
730             }
731             catch (InterruptedException x)
732             {
733                 throw new InterruptedIOException();
734             }
735             catch (ExecutionException x)
736             {
737                 throw new IOException(x.getCause());
738             }
739         }
740 
741         private DataInfo toDataInfo(Buffer buffer, boolean close)
742         {
743             if (buffer instanceof ByteArrayBuffer)
744                 return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
745 
746             if (buffer instanceof NIOBuffer)
747             {
748                 ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
749                 byteBuffer.limit(buffer.putIndex());
750                 byteBuffer.position(buffer.getIndex());
751                 return new ByteBufferDataInfo(byteBuffer, close);
752             }
753 
754             return new BytesDataInfo(buffer.asArray(), close);
755         }
756 
757         @Override
758         public int flushBuffer() throws IOException
759         {
760             // Must never be called because it's where the HttpGenerator writes
761             // the HTTP content to the EndPoint (we should write SPDY instead).
762             // If it's called it's our bug.
763             throw new UnsupportedOperationException();
764         }
765 
766         @Override
767         public void blockForOutput(long maxIdleTime) throws IOException
768         {
769             // The semantic of this method is weird: not only it has to block
770             // but also need to flush. Since we have a blocking flush method
771             // we delegate to that, because it has the same semantic.
772             flush(maxIdleTime);
773         }
774 
775         @Override
776         public void complete() throws IOException
777         {
778             Buffer content = getContentBuffer();
779             if (content != null)
780             {
781                 closed = true;
782                 _state = STATE_END;
783                 flush(getMaxIdleTime());
784             }
785             else if (!closed)
786             {
787                 closed = true;
788                 _state = STATE_END;
789                 // Send the last, empty, data frame
790                 stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
791             }
792         }
793     }
794 }