1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.server;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24
25 import org.eclipse.jetty.fcgi.FCGI;
26 import org.eclipse.jetty.fcgi.generator.Flusher;
27 import org.eclipse.jetty.fcgi.parser.ServerParser;
28 import org.eclipse.jetty.http.HttpField;
29 import org.eclipse.jetty.io.AbstractConnection;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.server.ByteBufferQueuedHttpInput;
33 import org.eclipse.jetty.server.Connector;
34 import org.eclipse.jetty.server.HttpConfiguration;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class ServerFCGIConnection extends AbstractConnection
39 {
40 private static final Logger LOG = Log.getLogger(ServerFCGIConnection.class);
41
42 private final ConcurrentMap<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
43 private final Connector connector;
44 private final boolean sendStatus200;
45 private final Flusher flusher;
46 private final HttpConfiguration configuration;
47 private final ServerParser parser;
48
49 public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
50 {
51 super(endPoint, connector.getExecutor());
52 this.connector = connector;
53 this.flusher = new Flusher(endPoint);
54 this.configuration = configuration;
55 this.sendStatus200 = sendStatus200;
56 this.parser = new ServerParser(new ServerListener());
57 }
58
59 @Override
60 public void onOpen()
61 {
62 super.onOpen();
63 fillInterested();
64 }
65
66 @Override
67 public void onFillable()
68 {
69 EndPoint endPoint = getEndPoint();
70 ByteBufferPool bufferPool = connector.getByteBufferPool();
71 ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), true);
72 try
73 {
74 while (true)
75 {
76 int read = endPoint.fill(buffer);
77 if (LOG.isDebugEnabled())
78 LOG.debug("Read {} bytes from {}", read, endPoint);
79 if (read > 0)
80 {
81 parse(buffer);
82 }
83 else if (read == 0)
84 {
85 fillInterested();
86 break;
87 }
88 else
89 {
90 shutdown();
91 break;
92 }
93 }
94 }
95 catch (Exception x)
96 {
97 if (LOG.isDebugEnabled())
98 LOG.debug(x);
99
100 }
101 finally
102 {
103 bufferPool.release(buffer);
104 }
105 }
106
107 private void parse(ByteBuffer buffer)
108 {
109 while (buffer.hasRemaining())
110 parser.parse(buffer);
111 }
112
113 private void shutdown()
114 {
115 flusher.shutdown();
116 }
117
118 private class ServerListener implements ServerParser.Listener
119 {
120 @Override
121 public void onStart(int request, FCGI.Role role, int flags)
122 {
123
124 HttpChannelOverFCGI channel = new HttpChannelOverFCGI(connector, configuration, getEndPoint(),
125 new HttpTransportOverFCGI(connector.getByteBufferPool(), flusher, request, sendStatus200),
126 new ByteBufferQueuedHttpInput());
127 HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel);
128 if (existing != null)
129 throw new IllegalStateException();
130 if (LOG.isDebugEnabled())
131 LOG.debug("Request {} start on {}", request, channel);
132 }
133
134 @Override
135 public void onHeader(int request, HttpField field)
136 {
137 HttpChannelOverFCGI channel = channels.get(request);
138 if (LOG.isDebugEnabled())
139 LOG.debug("Request {} header {} on {}", request, field, channel);
140 if (channel != null)
141 channel.header(field);
142 }
143
144 @Override
145 public void onHeaders(int request)
146 {
147 HttpChannelOverFCGI channel = channels.get(request);
148 if (LOG.isDebugEnabled())
149 LOG.debug("Request {} headers on {}", request, channel);
150 if (channel != null)
151 {
152 if (channel.headerComplete())
153 channel.dispatch();
154 }
155 }
156
157 @Override
158 public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
159 {
160 HttpChannelOverFCGI channel = channels.get(request);
161 if (LOG.isDebugEnabled())
162 LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel);
163 if (channel != null)
164 {
165 if (channel.content(buffer))
166 channel.dispatch();
167 }
168 return false;
169 }
170
171 @Override
172 public void onEnd(int request)
173 {
174 HttpChannelOverFCGI channel = channels.remove(request);
175 if (LOG.isDebugEnabled())
176 LOG.debug("Request {} end on {}", request, channel);
177 if (channel != null)
178 {
179 if (channel.messageComplete())
180 channel.dispatch();
181 }
182 }
183
184 @Override
185 public void onFailure(int request, Throwable failure)
186 {
187 HttpChannelOverFCGI channel = channels.remove(request);
188 if (LOG.isDebugEnabled())
189 LOG.debug("Request {} failure on {}: {}", request, channel, failure);
190 if (channel != null)
191 {
192 channel.badMessage(400, failure.toString());
193 }
194 }
195 }
196 }