1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.generator;
20
21 import java.nio.ByteBuffer;
22 import java.util.Queue;
23
24 import org.eclipse.jetty.io.EndPoint;
25 import org.eclipse.jetty.util.ConcurrentArrayQueue;
26 import org.eclipse.jetty.util.IteratingCallback;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29
30 public class Flusher
31 {
32 private static final Logger LOG = Log.getLogger(Flusher.class);
33
34 private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
35 private final IteratingCallback flushCallback = new FlushCallback();
36 private final EndPoint endPoint;
37
38 public Flusher(EndPoint endPoint)
39 {
40 this.endPoint = endPoint;
41 }
42
43 public void flush(Generator.Result... results)
44 {
45 for (Generator.Result result : results)
46 queue.offer(result);
47 flushCallback.iterate();
48 }
49
50 public void shutdown()
51 {
52 flush(new ShutdownResult());
53 }
54
55 private class FlushCallback extends IteratingCallback
56 {
57 private Generator.Result active;
58
59 @Override
60 protected Action process() throws Exception
61 {
62
63 Generator.Result result = queue.poll();
64 if (result == null)
65 {
66
67 return Action.IDLE;
68 }
69
70
71
72
73
74 Generator.Result other = queue.poll();
75 if (other != null)
76 result = result.join(other);
77
78 active = result;
79 ByteBuffer[] buffers = result.getByteBuffers();
80 endPoint.write(this, buffers);
81 return Action.SCHEDULED;
82 }
83
84 @Override
85 protected void completed()
86 {
87
88 throw new IllegalStateException();
89 }
90
91 @Override
92 public void succeeded()
93 {
94 if (active != null)
95 active.succeeded();
96 active = null;
97 super.succeeded();
98 }
99
100 @Override
101 public void failed(Throwable x)
102 {
103 if (active != null)
104 active.failed(x);
105 active = null;
106
107 while (true)
108 {
109 Generator.Result result = queue.poll();
110 if (result == null)
111 break;
112 result.failed(x);
113 }
114
115 super.failed(x);
116 }
117 }
118
119 private class ShutdownResult extends Generator.Result
120 {
121 private ShutdownResult()
122 {
123 super(null, null);
124 }
125
126 @Override
127 public void succeeded()
128 {
129 shutdown();
130 }
131
132 @Override
133 public void failed(Throwable x)
134 {
135 shutdown();
136 }
137
138 private void shutdown()
139 {
140 LOG.debug("Shutting down {}", endPoint);
141 endPoint.shutdownOutput();
142 }
143 }
144 }