View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.generator;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Queue;
23  
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.util.ConcurrentArrayQueue;
26  import org.eclipse.jetty.util.IteratingCallback;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  
30  public class Flusher
31  {
32      private static final Logger LOG = Log.getLogger(Flusher.class);
33  
34      private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
35      private final IteratingCallback flushCallback = new FlushCallback();
36      private final EndPoint endPoint;
37  
38      public Flusher(EndPoint endPoint)
39      {
40          this.endPoint = endPoint;
41      }
42  
43      public void flush(Generator.Result... results)
44      {
45          for (Generator.Result result : results)
46              queue.offer(result);
47          flushCallback.iterate();
48      }
49  
50      public void shutdown()
51      {
52          flush(new ShutdownResult());
53      }
54  
55      private class FlushCallback extends IteratingCallback
56      {
57          private Generator.Result active;
58  
59          @Override
60          protected Action process() throws Exception
61          {
62              // Look if other writes are needed.
63              Generator.Result result = queue.poll();
64              if (result == null)
65              {
66                  // No more writes to do, return.
67                  return Action.IDLE;
68              }
69  
70              // Attempt to gather another result.
71              // Most often there is another result in the
72              // queue so this is a real optimization because
73              // it sends both results in just one TCP packet.
74              Generator.Result other = queue.poll();
75              if (other != null)
76                  result = result.join(other);
77  
78              active = result;
79              ByteBuffer[] buffers = result.getByteBuffers();
80              endPoint.write(this, buffers);
81              return Action.SCHEDULED;
82          }
83  
84          @Override
85          protected void onCompleteSuccess()
86          {
87              // We never return Action.SUCCEEDED, so this method is never called.
88              throw new IllegalStateException();
89          }
90  
91          @Override
92          public void succeeded()
93          {
94              if (active != null)
95                  active.succeeded();
96              active = null;
97              super.succeeded();
98          }
99  
100         @Override
101         public void onCompleteFailure(Throwable x)
102         {
103             if (active != null)
104                 active.failed(x);
105             active = null;
106 
107             while (true)
108             {
109                 Generator.Result result = queue.poll();
110                 if (result == null)
111                     break;
112                 result.failed(x);
113             }
114         }
115     }
116 
117     private class ShutdownResult extends Generator.Result
118     {
119         private ShutdownResult()
120         {
121             super(null, null);
122         }
123 
124         @Override
125         public void succeeded()
126         {
127             shutdown();
128         }
129 
130         @Override
131         public void failed(Throwable x)
132         {
133             shutdown();
134         }
135 
136         private void shutdown()
137         {
138             if (LOG.isDebugEnabled())
139                 LOG.debug("Shutting down {}", endPoint);
140             endPoint.shutdownOutput();
141         }
142     }
143 }