View Javadoc

1   //========================================================================
2   //Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //All rights reserved. This program and the accompanying materials
5   //are made available under the terms of the Eclipse Public License v1.0
6   //and Apache License v2.0 which accompanies this distribution.
7   //The Eclipse Public License is available at
8   //http://www.eclipse.org/legal/epl-v10.html
9   //The Apache License v2.0 is available at
10  //http://www.opensource.org/licenses/apache2.0.php
11  //You may elect to redistribute this code under either of these licenses.
12  //========================================================================
13  
14  
15  package org.eclipse.jetty.spdy.http;
16  
17  import java.io.EOFException;
18  import java.io.IOException;
19  import java.io.InterruptedIOException;
20  import java.nio.ByteBuffer;
21  import java.util.LinkedList;
22  import java.util.Queue;
23  import java.util.Set;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.TimeoutException;
29  
30  import org.eclipse.jetty.http.HttpException;
31  import org.eclipse.jetty.http.HttpFields;
32  import org.eclipse.jetty.http.HttpGenerator;
33  import org.eclipse.jetty.http.HttpParser;
34  import org.eclipse.jetty.http.HttpStatus;
35  import org.eclipse.jetty.io.AsyncEndPoint;
36  import org.eclipse.jetty.io.Buffer;
37  import org.eclipse.jetty.io.Buffers;
38  import org.eclipse.jetty.io.ByteArrayBuffer;
39  import org.eclipse.jetty.io.Connection;
40  import org.eclipse.jetty.io.EndPoint;
41  import org.eclipse.jetty.io.nio.AsyncConnection;
42  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
43  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
44  import org.eclipse.jetty.io.nio.NIOBuffer;
45  import org.eclipse.jetty.server.AbstractHttpConnection;
46  import org.eclipse.jetty.server.Connector;
47  import org.eclipse.jetty.server.Server;
48  import org.eclipse.jetty.spdy.SPDYAsyncConnection;
49  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
50  import org.eclipse.jetty.spdy.api.BytesDataInfo;
51  import org.eclipse.jetty.spdy.api.DataInfo;
52  import org.eclipse.jetty.spdy.api.Handler;
53  import org.eclipse.jetty.spdy.api.Headers;
54  import org.eclipse.jetty.spdy.api.ReplyInfo;
55  import org.eclipse.jetty.spdy.api.RstInfo;
56  import org.eclipse.jetty.spdy.api.SPDY;
57  import org.eclipse.jetty.spdy.api.Stream;
58  import org.eclipse.jetty.spdy.api.StreamStatus;
59  import org.eclipse.jetty.spdy.api.SynInfo;
60  import org.eclipse.jetty.util.log.Log;
61  import org.eclipse.jetty.util.log.Logger;
62  
63  public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
64  {
65      private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnection.class);
66      private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
67      private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
68  
69      private final Queue<Runnable> tasks = new LinkedList<>();
70      private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
71      private final short version;
72      private final SPDYAsyncConnection connection;
73      private final PushStrategy pushStrategy;
74      private final Stream stream;
75      private Headers headers; // No need for volatile, guarded by state
76      private DataInfo dataInfo; // No need for volatile, guarded by state
77      private NIOBuffer buffer; // No need for volatile, guarded by state
78      private volatile State state = State.INITIAL;
79      private boolean dispatched; // Guarded by synchronization on tasks
80  
81      public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, short version, SPDYAsyncConnection connection, PushStrategy pushStrategy, Stream stream)
82      {
83          super(connector, endPoint, server);
84          this.version = version;
85          this.connection = connection;
86          this.pushStrategy = pushStrategy;
87          this.stream = stream;
88          getParser().setPersistent(true);
89      }
90  
91      @Override
92      protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
93      {
94          return new HTTPSPDYParser(requestBuffers, endPoint);
95      }
96  
97      @Override
98      protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
99      {
100         return new HTTPSPDYGenerator(responseBuffers, endPoint);
101     }
102 
103     @Override
104     public AsyncEndPoint getEndPoint()
105     {
106         return (AsyncEndPoint)super.getEndPoint();
107     }
108 
109     private void post(Runnable task)
110     {
111         synchronized (tasks)
112         {
113             logger.debug("Posting task {}", task);
114             tasks.offer(task);
115             dispatch();
116         }
117     }
118 
119     private void dispatch()
120     {
121         synchronized (tasks)
122         {
123             if (dispatched)
124                 return;
125 
126             final Runnable task = tasks.poll();
127             if (task != null)
128             {
129                 dispatched = true;
130                 logger.debug("Dispatching task {}", task);
131                 execute(new Runnable()
132                 {
133                     @Override
134                     public void run()
135                     {
136                         logger.debug("Executing task {}", task);
137                         task.run();
138                         logger.debug("Completing task {}", task);
139                         dispatched = false;
140                         dispatch();
141                     }
142                 });
143             }
144         }
145     }
146 
147     protected void execute(Runnable task)
148     {
149         getServer().getThreadPool().dispatch(task);
150     }
151 
152     @Override
153     public Connection handle()
154     {
155         setCurrentConnection(this);
156         try
157         {
158             switch (state)
159             {
160                 case INITIAL:
161                 {
162                     break;
163                 }
164                 case REQUEST:
165                 {
166                     Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(version));
167                     Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
168                     Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(this.version));
169 
170                     if (method == null || uri == null || version == null)
171                         throw new HttpException(HttpStatus.BAD_REQUEST_400);
172 
173                     String m = method.value();
174                     String u = uri.value();
175                     String v = version.value();
176                     logger.debug("HTTP > {} {} {}", m, u, v);
177                     startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
178 
179                     Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(this.version));
180                     if(schemeHeader != null)
181                         _request.setScheme(schemeHeader.value());
182 
183                     updateState(State.HEADERS);
184                     handle();
185                     break;
186                 }
187                 case HEADERS:
188                 {
189                     for (Headers.Header header : headers)
190                     {
191                         String name = header.name();
192 
193                         // Skip special SPDY headers, unless it's the "host" header
194                         HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(version, name);
195                         if (specialHeader != null)
196                         {
197                             if (specialHeader == HTTPSPDYHeader.HOST)
198                                 name = "host";
199                             else
200                                 continue;
201                         }
202 
203                         switch (name)
204                         {
205                             case "connection":
206                             case "keep-alive":
207                             case "proxy-connection":
208                             case "transfer-encoding":
209                             {
210                                 // Spec says to ignore these headers
211                                 continue;
212                             }
213                             default:
214                             {
215                                 // Spec says headers must be single valued
216                                 String value = header.value();
217                                 logger.debug("HTTP > {}: {}", name, value);
218                                 parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
219                                 break;
220                             }
221                         }
222                     }
223                     break;
224                 }
225                 case HEADERS_COMPLETE:
226                 {
227                     headerComplete();
228                     break;
229                 }
230                 case CONTENT:
231                 {
232                     final Buffer buffer = this.buffer;
233                     if (buffer != null && buffer.length() > 0)
234                         content(buffer);
235                     break;
236                 }
237                 case FINAL:
238                 {
239                     messageComplete(0);
240                     break;
241                 }
242                 case ASYNC:
243                 {
244                     handleRequest();
245                     break;
246                 }
247                 default:
248                 {
249                     throw new IllegalStateException();
250                 }
251             }
252             return this;
253         }
254         catch (HttpException x)
255         {
256             respond(stream, x.getStatus());
257             return this;
258         }
259         catch (IOException x)
260         {
261             close(stream);
262             return this;
263         }
264         finally
265         {
266             setCurrentConnection(null);
267         }
268     }
269 
270     private void respond(Stream stream, int status)
271     {
272         if (stream.isUnidirectional())
273         {
274             stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.INTERNAL_ERROR));
275         }
276         else
277         {
278             Headers headers = new Headers();
279             headers.put(HTTPSPDYHeader.STATUS.name(version), String.valueOf(status));
280             headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
281             stream.reply(new ReplyInfo(headers, true));
282         }
283     }
284 
285     private void close(Stream stream)
286     {
287         stream.getSession().goAway();
288     }
289 
290     @Override
291     public void onInputShutdown() throws IOException
292     {
293     }
294 
295     private void updateState(State newState)
296     {
297         logger.debug("State update {} -> {}", state, newState);
298         state = newState;
299     }
300 
301     public void beginRequest(final Headers headers, final boolean endRequest)
302     {
303         this.headers = headers.isEmpty() ? null : headers;
304         post(new Runnable()
305         {
306             @Override
307             public void run()
308             {
309                 if (!headers.isEmpty())
310                     updateState(State.REQUEST);
311                 handle();
312                 if (endRequest)
313                     performEndRequest();
314             }
315         });
316     }
317 
318     public void headers(Headers headers)
319     {
320         this.headers = headers;
321         post(new Runnable()
322         {
323             @Override
324             public void run()
325             {
326                 updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
327                 handle();
328             }
329         });
330     }
331 
332     public void content(final DataInfo dataInfo, boolean endRequest)
333     {
334         // We need to copy the dataInfo since we do not know when its bytes
335         // will be consumed. When the copy is consumed, we consume also the
336         // original, so the implementation can send a window update.
337         ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
338         {
339             @Override
340             public void consume(int delta)
341             {
342                 super.consume(delta);
343                 dataInfo.consume(delta);
344             }
345         };
346         logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
347         dataInfos.offer(copyDataInfo);
348         if (endRequest)
349             dataInfos.offer(END_OF_CONTENT);
350         post(new Runnable()
351         {
352             @Override
353             public void run()
354             {
355                 logger.debug("HTTP > {} bytes of content", dataInfo.length());
356                 if (state == State.HEADERS)
357                 {
358                     updateState(State.HEADERS_COMPLETE);
359                     handle();
360                 }
361                 updateState(State.CONTENT);
362                 handle();
363             }
364         });
365     }
366 
367     public void endRequest()
368     {
369         post(new Runnable()
370         {
371             public void run()
372             {
373                 performEndRequest();
374             }
375         });
376     }
377 
378     private void performEndRequest()
379     {
380         if (state == State.HEADERS)
381         {
382             updateState(State.HEADERS_COMPLETE);
383             handle();
384         }
385         updateState(State.FINAL);
386         handle();
387     }
388 
389     public void async()
390     {
391         post(new Runnable()
392         {
393             @Override
394             public void run()
395             {
396                 State oldState = state;
397                 updateState(State.ASYNC);
398                 handle();
399                 updateState(oldState);
400             }
401         });
402     }
403 
404     protected void reply(Stream stream, ReplyInfo replyInfo)
405     {
406         if (!stream.isUnidirectional())
407             stream.reply(replyInfo);
408         if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") &&
409                 !stream.isClosed())
410         {
411             // We have a 200 OK with some content to send
412 
413             Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(version));
414             Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(version));
415             Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
416             Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
417 
418             for (String pushResourcePath : pushResources)
419             {
420                 final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
421                 final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
422 
423                 stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
424                 {
425                     @Override
426                     public void completed(Stream pushStream)
427                     {
428                         ServerHTTPSPDYAsyncConnection pushConnection =
429                                 new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), version, connection, pushStrategy, pushStream);
430                         pushConnection.beginRequest(requestHeaders, true);
431                     }
432                 });
433             }
434         }
435     }
436 
437     private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
438     {
439         final Headers requestHeaders = new Headers();
440         requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
441         requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
442         requestHeaders.put(scheme);
443         requestHeaders.put(host);
444         requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
445         String referrer = scheme.value() + "://" + host.value() + uri.value();
446         requestHeaders.put("referer", referrer);
447         // Remember support for gzip encoding
448         requestHeaders.put(headers.get("accept-encoding"));
449         requestHeaders.put("x-spdy-push", "true");
450         return requestHeaders;
451     }
452 
453     private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
454     {
455         final Headers pushHeaders = new Headers();
456         if (version == SPDY.V2)
457             pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
458         else
459         {
460             pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
461             pushHeaders.put(scheme);
462             pushHeaders.put(host);
463         }
464         pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
465         pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
466         return pushHeaders;
467     }
468 
469     private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
470     {
471         while (true)
472         {
473             // Volatile read to ensure visibility
474             State state = this.state;
475             if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
476                 throw new IllegalStateException();
477 
478             if (buffer != null)
479             {
480                 if (buffer.length() > 0)
481                 {
482                     logger.debug("Consuming content bytes, {} available", buffer.length());
483                     return buffer;
484                 }
485                 else
486                 {
487                     // The application has consumed the buffer, so consume also the DataInfo
488                     dataInfo.consume(dataInfo.length());
489                     logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
490                     dataInfo = null;
491                     buffer = null;
492                     // Loop to get content bytes from DataInfos
493                 }
494             }
495             else
496             {
497                 logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
498                 long begin = System.nanoTime();
499                 dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
500                 long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
501                 logger.debug("Waited {} ms for content bytes", elapsed);
502                 if (dataInfo != null)
503                 {
504                     if (dataInfo == END_OF_CONTENT)
505                     {
506                         logger.debug("End of content bytes, queue size {}", dataInfos.size());
507                         return null;
508                     }
509 
510                     ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
511                     buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
512                     // Loop to return the buffer
513                 }
514                 else
515                 {
516                     stream.getSession().goAway();
517                     throw new EOFException("read timeout");
518                 }
519             }
520         }
521     }
522 
523     private int availableContent()
524     {
525         // Volatile read to ensure visibility
526         State state = this.state;
527         if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
528             throw new IllegalStateException();
529         return buffer == null ? 0 : buffer.length();
530     }
531 
532     @Override
533     public void commitResponse(boolean last) throws IOException
534     {
535         // Keep the original behavior since it just delegates to the generator
536         super.commitResponse(last);
537     }
538 
539     @Override
540     public void flushResponse() throws IOException
541     {
542         // Just commit the response, if necessary: flushing buffers will be taken care of in complete()
543         commitResponse(false);
544     }
545 
546     @Override
547     public void completeResponse() throws IOException
548     {
549         // Keep the original behavior since it just delegates to the generator
550         super.completeResponse();
551     }
552 
553     private enum State
554     {
555         INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
556     }
557 
558     /**
559      * Needed in order to override parser methods that read content.
560      */
561     private class HTTPSPDYParser extends HttpParser
562     {
563         public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
564         {
565             super(buffers, endPoint, new HTTPSPDYParserHandler());
566         }
567 
568         @Override
569         public Buffer blockForContent(long maxIdleTime) throws IOException
570         {
571             try
572             {
573                 return consumeContent(maxIdleTime);
574             }
575             catch (InterruptedException x)
576             {
577                 throw new InterruptedIOException();
578             }
579         }
580 
581         @Override
582         public int available() throws IOException
583         {
584             return availableContent();
585         }
586     }
587 
588     /**
589      * Empty implementation, since it won't parse anything
590      */
591     private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
592     {
593         @Override
594         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
595         {
596         }
597 
598         @Override
599         public void content(Buffer ref) throws IOException
600         {
601         }
602 
603         @Override
604         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
605         {
606         }
607     }
608 
609     /**
610      * Needed in order to override generator methods that would generate HTTP,
611      * since we must generate SPDY instead.
612      */
613     private class HTTPSPDYGenerator extends HttpGenerator
614     {
615         private boolean closed;
616 
617         private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
618         {
619             super(buffers, endPoint);
620         }
621 
622         @Override
623         public void send1xx(int code) throws IOException
624         {
625             // TODO: not supported yet, but unlikely to be called
626             throw new UnsupportedOperationException();
627         }
628 
629         @Override
630         public void sendResponse(Buffer response) throws IOException
631         {
632             // Do not think this method is ever used.
633             // Jetty calls it from Request.setAttribute() only if the attribute
634             // "org.eclipse.jetty.server.ResponseBuffer", seems like a hack.
635             throw new UnsupportedOperationException();
636         }
637 
638         @Override
639         public void sendError(int code, String reason, String content, boolean close) throws IOException
640         {
641             // Keep original behavior because it's delegating to other methods that we override.
642             super.sendError(code, reason, content, close);
643         }
644 
645         @Override
646         public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
647         {
648             Headers headers = new Headers();
649             String version = "HTTP/1.1";
650             headers.put(HTTPSPDYHeader.VERSION.name(ServerHTTPSPDYAsyncConnection.this.version), version);
651             StringBuilder status = new StringBuilder().append(_status);
652             if (_reason != null)
653                 status.append(" ").append(_reason.toString("UTF-8"));
654             headers.put(HTTPSPDYHeader.STATUS.name(ServerHTTPSPDYAsyncConnection.this.version), status.toString());
655             logger.debug("HTTP < {} {}", version, status);
656 
657             if (fields != null)
658             {
659                 for (int i = 0; i < fields.size(); ++i)
660                 {
661                     HttpFields.Field field = fields.getField(i);
662                     String name = field.getName().toLowerCase();
663                     String value = field.getValue();
664                     headers.put(name, value);
665                     logger.debug("HTTP < {}: {}", name, value);
666                 }
667             }
668 
669             // We have to query the HttpGenerator and its buffers to know
670             // whether there is content buffered and update the generator state
671             Buffer content = getContentBuffer();
672             reply(stream, new ReplyInfo(headers, content == null));
673             if (content != null)
674             {
675                 closed = false;
676                 // Update HttpGenerator fields so that they remain consistent
677                 _state = HttpGenerator.STATE_CONTENT;
678             }
679             else
680             {
681                 closed = true;
682                 // Update HttpGenerator fields so that they remain consistent
683                 _state = HttpGenerator.STATE_END;
684             }
685         }
686 
687         private Buffer getContentBuffer()
688         {
689             if (_buffer != null && _buffer.length() > 0)
690                 return _buffer;
691             if (_content != null && _content.length() > 0)
692                 return _content;
693             return null;
694         }
695 
696         @Override
697         public boolean addContent(byte b) throws IOException
698         {
699             // In HttpGenerator, writing one byte only has a different path than
700             // writing a buffer. Here we normalize these path to keep it simpler.
701             addContent(new ByteArrayBuffer(new byte[]{b}), false);
702             return false;
703         }
704 
705         @Override
706         public void addContent(Buffer content, boolean last) throws IOException
707         {
708             // Keep the original behavior since adding content will
709             // just accumulate bytes until the response is committed.
710             super.addContent(content, last);
711         }
712 
713         @Override
714         public void flush(long maxIdleTime) throws IOException
715         {
716             try
717             {
718                 Buffer content = getContentBuffer();
719                 while (content != null)
720                 {
721                     DataInfo dataInfo = toDataInfo(content, closed);
722                     logger.debug("HTTP < {} bytes of content", dataInfo.length());
723                     stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
724                     content.clear();
725                     _bypass = false;
726                     content = getContentBuffer();
727                 }
728             }
729             catch (TimeoutException x)
730             {
731                 stream.getSession().goAway();
732                 throw new EOFException("write timeout");
733             }
734             catch (InterruptedException x)
735             {
736                 throw new InterruptedIOException();
737             }
738             catch (ExecutionException x)
739             {
740                 throw new IOException(x.getCause());
741             }
742         }
743 
744         private DataInfo toDataInfo(Buffer buffer, boolean close)
745         {
746             if (buffer instanceof ByteArrayBuffer)
747                 return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
748 
749             if (buffer instanceof NIOBuffer)
750             {
751                 ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
752                 byteBuffer.limit(buffer.putIndex());
753                 byteBuffer.position(buffer.getIndex());
754                 return new ByteBufferDataInfo(byteBuffer, close);
755             }
756 
757             return new BytesDataInfo(buffer.asArray(), close);
758         }
759 
760         @Override
761         public int flushBuffer() throws IOException
762         {
763             // Must never be called because it's where the HttpGenerator writes
764             // the HTTP content to the EndPoint (we should write SPDY instead).
765             // If it's called it's our bug.
766             throw new UnsupportedOperationException();
767         }
768 
769         @Override
770         public void blockForOutput(long maxIdleTime) throws IOException
771         {
772             // The semantic of this method is weird: not only it has to block
773             // but also need to flush. Since we have a blocking flush method
774             // we delegate to that, because it has the same semantic.
775             flush(maxIdleTime);
776         }
777 
778         @Override
779         public void complete() throws IOException
780         {
781             Buffer content = getContentBuffer();
782             if (content != null)
783             {
784                 closed = true;
785                 _state = STATE_END;
786                 flush(getMaxIdleTime());
787             }
788             else if (!closed)
789             {
790                 closed = true;
791                 _state = STATE_END;
792                 // Send the last, empty, data frame
793                 stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
794             }
795         }
796     }
797 }