1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.generator;
20
21 import java.nio.ByteBuffer;
22 import java.util.Queue;
23
24 import org.eclipse.jetty.io.EndPoint;
25 import org.eclipse.jetty.util.ConcurrentArrayQueue;
26 import org.eclipse.jetty.util.IteratingCallback;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29
30 public class Flusher
31 {
32 private static final Logger LOG = Log.getLogger(Flusher.class);
33
34 private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
35 private final IteratingCallback flushCallback = new FlushCallback();
36 private final EndPoint endPoint;
37
38 public Flusher(EndPoint endPoint)
39 {
40 this.endPoint = endPoint;
41 }
42
43 public void flush(Generator.Result... results)
44 {
45 for (Generator.Result result : results)
46 queue.offer(result);
47 flushCallback.iterate();
48 }
49
50 public void shutdown()
51 {
52 flush(new ShutdownResult());
53 }
54
55 private class FlushCallback extends IteratingCallback
56 {
57 private Generator.Result active;
58
59 @Override
60 protected Action process() throws Exception
61 {
62
63 Generator.Result result = queue.poll();
64 if (result == null)
65 {
66
67 return Action.IDLE;
68 }
69
70
71
72
73
74 Generator.Result other = queue.poll();
75 if (other != null)
76 result = result.join(other);
77
78 active = result;
79 ByteBuffer[] buffers = result.getByteBuffers();
80 endPoint.write(this, buffers);
81 return Action.SCHEDULED;
82 }
83
84 @Override
85 protected void onCompleteSuccess()
86 {
87
88 throw new IllegalStateException();
89 }
90
91 @Override
92 public void succeeded()
93 {
94 if (active != null)
95 active.succeeded();
96 active = null;
97 super.succeeded();
98 }
99
100 @Override
101 public void onCompleteFailure(Throwable x)
102 {
103 if (active != null)
104 active.failed(x);
105 active = null;
106
107 while (true)
108 {
109 Generator.Result result = queue.poll();
110 if (result == null)
111 break;
112 result.failed(x);
113 }
114 }
115 }
116
117 private class ShutdownResult extends Generator.Result
118 {
119 private ShutdownResult()
120 {
121 super(null, null);
122 }
123
124 @Override
125 public void succeeded()
126 {
127 shutdown();
128 }
129
130 @Override
131 public void failed(Throwable x)
132 {
133 shutdown();
134 }
135
136 private void shutdown()
137 {
138 if (LOG.isDebugEnabled())
139 LOG.debug("Shutting down {}", endPoint);
140 endPoint.shutdownOutput();
141 }
142 }
143 }