1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.server;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24
25 import org.eclipse.jetty.fcgi.FCGI;
26 import org.eclipse.jetty.fcgi.generator.Flusher;
27 import org.eclipse.jetty.fcgi.parser.ServerParser;
28 import org.eclipse.jetty.http.HttpField;
29 import org.eclipse.jetty.io.AbstractConnection;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.server.Connector;
33 import org.eclipse.jetty.server.HttpConfiguration;
34 import org.eclipse.jetty.server.HttpInput;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class ServerFCGIConnection extends AbstractConnection
39 {
40 private static final Logger LOG = Log.getLogger(ServerFCGIConnection.class);
41
42 private final ConcurrentMap<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
43 private final Connector connector;
44 private final boolean sendStatus200;
45 private final Flusher flusher;
46 private final HttpConfiguration configuration;
47 private final ServerParser parser;
48
49 public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
50 {
51 super(endPoint, connector.getExecutor());
52 this.connector = connector;
53 this.flusher = new Flusher(endPoint);
54 this.configuration = configuration;
55 this.sendStatus200 = sendStatus200;
56 this.parser = new ServerParser(new ServerListener());
57 }
58
59 @Override
60 public void onOpen()
61 {
62 super.onOpen();
63 fillInterested();
64 }
65
66 @Override
67 public void onFillable()
68 {
69 EndPoint endPoint = getEndPoint();
70 ByteBufferPool bufferPool = connector.getByteBufferPool();
71 ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), true);
72 try
73 {
74 while (true)
75 {
76 int read = endPoint.fill(buffer);
77 if (LOG.isDebugEnabled())
78 LOG.debug("Read {} bytes from {}", read, endPoint);
79 if (read > 0)
80 {
81 parse(buffer);
82 }
83 else if (read == 0)
84 {
85 bufferPool.release(buffer);
86 fillInterested();
87 break;
88 }
89 else
90 {
91 bufferPool.release(buffer);
92 shutdown();
93 break;
94 }
95 }
96 }
97 catch (Exception x)
98 {
99 if (LOG.isDebugEnabled())
100 LOG.debug(x);
101 bufferPool.release(buffer);
102
103 }
104 }
105
106 private void parse(ByteBuffer buffer)
107 {
108 while (buffer.hasRemaining())
109 parser.parse(buffer);
110 }
111
112 private void shutdown()
113 {
114 flusher.shutdown();
115 }
116
117 private class ServerListener implements ServerParser.Listener
118 {
119 @Override
120 public void onStart(int request, FCGI.Role role, int flags)
121 {
122
123 HttpChannelOverFCGI channel = new HttpChannelOverFCGI(connector, configuration, getEndPoint(),
124 new HttpTransportOverFCGI(connector.getByteBufferPool(), flusher, request, sendStatus200));
125 HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel);
126 if (existing != null)
127 throw new IllegalStateException();
128 if (LOG.isDebugEnabled())
129 LOG.debug("Request {} start on {}", request, channel);
130 }
131
132 @Override
133 public void onHeader(int request, HttpField field)
134 {
135 HttpChannelOverFCGI channel = channels.get(request);
136 if (LOG.isDebugEnabled())
137 LOG.debug("Request {} header {} on {}", request, field, channel);
138 if (channel != null)
139 channel.header(field);
140 }
141
142 @Override
143 public void onHeaders(int request)
144 {
145 HttpChannelOverFCGI channel = channels.get(request);
146 if (LOG.isDebugEnabled())
147 LOG.debug("Request {} headers on {}", request, channel);
148 if (channel != null)
149 {
150 channel.onRequest();
151 channel.dispatch();
152 }
153 }
154
155 @Override
156 public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
157 {
158 HttpChannelOverFCGI channel = channels.get(request);
159 if (LOG.isDebugEnabled())
160 LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel);
161 if (channel != null)
162 {
163 ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
164 copy.put(buffer).flip();
165 channel.onContent(new HttpInput.Content(copy));
166 }
167 return false;
168 }
169
170 @Override
171 public void onEnd(int request)
172 {
173 HttpChannelOverFCGI channel = channels.remove(request);
174 if (LOG.isDebugEnabled())
175 LOG.debug("Request {} end on {}", request, channel);
176 if (channel != null)
177 {
178 channel.onRequestComplete();
179 }
180 }
181
182 @Override
183 public void onFailure(int request, Throwable failure)
184 {
185 HttpChannelOverFCGI channel = channels.remove(request);
186 if (LOG.isDebugEnabled())
187 LOG.debug("Request {} failure on {}: {}", request, channel, failure);
188 if (channel != null)
189 {
190 channel.onBadMessage(400, failure.toString());
191 }
192 }
193 }
194 }