View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.generator;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Queue;
23  
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.util.ConcurrentArrayQueue;
26  import org.eclipse.jetty.util.IteratingCallback;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  
30  public class Flusher
31  {
32      private static final Logger LOG = Log.getLogger(Flusher.class);
33  
34      private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
35      private final IteratingCallback flushCallback = new FlushCallback();
36      private final EndPoint endPoint;
37  
38      public Flusher(EndPoint endPoint)
39      {
40          this.endPoint = endPoint;
41      }
42  
43      public void flush(Generator.Result... results)
44      {
45          for (Generator.Result result : results)
46              queue.offer(result);
47          flushCallback.iterate();
48      }
49  
50      public void shutdown()
51      {
52          flush(new ShutdownResult());
53      }
54  
55      private class FlushCallback extends IteratingCallback
56      {
57          private Generator.Result active;
58  
59          @Override
60          protected Action process() throws Exception
61          {
62              // Look if other writes are needed.
63              Generator.Result result = queue.poll();
64              if (result == null)
65              {
66                  // No more writes to do, return.
67                  return Action.IDLE;
68              }
69  
70              // Attempt to gather another result.
71              // Most often there is another result in the
72              // queue so this is a real optimization because
73              // it sends both results in just one TCP packet.
74              Generator.Result other = queue.poll();
75              if (other != null)
76                  result = result.join(other);
77  
78              active = result;
79              ByteBuffer[] buffers = result.getByteBuffers();
80              endPoint.write(this, buffers);
81              return Action.SCHEDULED;
82          }
83  
84          @Override
85          protected void completed()
86          {
87              // We never return Action.SUCCEEDED, so this method is never called.
88              throw new IllegalStateException();
89          }
90  
91          @Override
92          public void succeeded()
93          {
94              if (active != null)
95                  active.succeeded();
96              active = null;
97              super.succeeded();
98          }
99  
100         @Override
101         public void failed(Throwable x)
102         {
103             if (active != null)
104                 active.failed(x);
105             active = null;
106 
107             while (true)
108             {
109                 Generator.Result result = queue.poll();
110                 if (result == null)
111                     break;
112                 result.failed(x);
113             }
114 
115             super.failed(x);
116         }
117     }
118 
119     private class ShutdownResult extends Generator.Result
120     {
121         private ShutdownResult()
122         {
123             super(null, null);
124         }
125 
126         @Override
127         public void succeeded()
128         {
129             shutdown();
130         }
131 
132         @Override
133         public void failed(Throwable x)
134         {
135             shutdown();
136         }
137 
138         private void shutdown()
139         {
140             LOG.debug("Shutting down {}", endPoint);
141             endPoint.shutdownOutput();
142         }
143     }
144 }