1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22
23 import org.eclipse.jetty.http.HttpException;
24 import org.eclipse.jetty.http.HttpStatus;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.AsyncConnection;
29 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32
33
34
35
36
37
38 public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
39 {
40 private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100);
41 private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200);
42
43 private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
44 private int _total_no_progress;
45 private final AsyncEndPoint _asyncEndp;
46 private boolean _readInterested = true;
47
48 public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
49 {
50 super(connector,endpoint,server);
51 _asyncEndp=(AsyncEndPoint)endpoint;
52 }
53
54 @Override
55 public Connection handle() throws IOException
56 {
57 Connection connection = this;
58 boolean some_progress=false;
59 boolean progress=true;
60
61 try
62 {
63 setCurrentConnection(this);
64
65
66 _asyncEndp.setCheckForIdle(false);
67
68
69
70 while (progress && connection==this)
71 {
72 progress=false;
73 try
74 {
75
76 if (_request._async.isAsync())
77 {
78 if (_request._async.isDispatchable())
79 handleRequest();
80 }
81
82 else if (!_parser.isComplete() && _parser.parseAvailable())
83 progress=true;
84
85
86 if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown() && !_request.getAsyncContinuation().isAsyncStarted())
87 if (_generator.flushBuffer()>0)
88 progress=true;
89
90
91 _endp.flush();
92
93
94 if (_asyncEndp.hasProgressed())
95 progress=true;
96 }
97 catch (HttpException e)
98 {
99 if (LOG.isDebugEnabled())
100 {
101 LOG.debug("uri="+_uri);
102 LOG.debug("fields="+_requestFields);
103 LOG.debug(e);
104 }
105 progress=true;
106 _generator.sendError(e.getStatus(), e.getReason(), null, true);
107 }
108 finally
109 {
110 some_progress|=progress;
111
112 boolean parserComplete = _parser.isComplete();
113 boolean generatorComplete = _generator.isComplete();
114 boolean complete = parserComplete && generatorComplete;
115 if (parserComplete)
116 {
117 if (generatorComplete)
118 {
119
120 progress=true;
121
122
123 if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
124 {
125 Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
126 if (switched!=null)
127 connection=switched;
128 }
129
130 reset();
131
132
133 if (!_generator.isPersistent() && !_endp.isOutputShutdown())
134 {
135 LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
136 _endp.shutdownOutput();
137 }
138 }
139 else
140 {
141
142
143
144 _readInterested = false;
145 LOG.debug("Disabled read interest while writing response {}", _endp);
146 }
147 }
148
149 if (!complete && _request.getAsyncContinuation().isAsyncStarted())
150 {
151
152
153 LOG.debug("suspended {}",this);
154 progress=false;
155 }
156 }
157 }
158 }
159 finally
160 {
161 setCurrentConnection(null);
162
163
164 if (!_request.getAsyncContinuation().isAsyncStarted())
165 {
166
167 _parser.returnBuffers();
168 _generator.returnBuffers();
169
170
171 _asyncEndp.setCheckForIdle(true);
172 }
173
174
175 if (some_progress)
176 _total_no_progress=0;
177 else
178 {
179 _total_no_progress++;
180 if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
181 LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
182 if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
183 {
184 LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
185 if (_endp instanceof SelectChannelEndPoint)
186 ((SelectChannelEndPoint)_endp).getChannel().close();
187 }
188 }
189 }
190 return connection;
191 }
192
193 public void onInputShutdown() throws IOException
194 {
195
196 if (_generator.isIdle() && !_request.getAsyncContinuation().isSuspended())
197 {
198
199 _endp.close();
200 }
201
202
203 if (_parser.isIdle())
204 _parser.setPersistent(false);
205 }
206
207 @Override
208 public void reset()
209 {
210 _readInterested = true;
211 LOG.debug("Enabled read interest {}", _endp);
212 super.reset();
213 }
214
215 @Override
216 public boolean isSuspended()
217 {
218 return !_readInterested || super.isSuspended();
219 }
220 }