1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.Executor;
25
26 import org.eclipse.jetty.io.AbstractConnection;
27 import org.eclipse.jetty.io.ByteBufferPool;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Logger;
33
34 public abstract class ProxyConnection extends AbstractConnection
35 {
36 protected static final Logger LOG = ConnectHandler.LOG;
37 private final IteratingCallback pipe = new ProxyIteratingCallback();
38 private final ByteBufferPool bufferPool;
39 private final ConcurrentMap<String, Object> context;
40 private Connection connection;
41
42 protected ProxyConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
43 {
44 super(endp, executor);
45 this.bufferPool = bufferPool;
46 this.context = context;
47 }
48
49 public ByteBufferPool getByteBufferPool()
50 {
51 return bufferPool;
52 }
53
54 public ConcurrentMap<String, Object> getContext()
55 {
56 return context;
57 }
58
59 public Connection getConnection()
60 {
61 return connection;
62 }
63
64 public void setConnection(Connection connection)
65 {
66 this.connection = connection;
67 }
68
69 @Override
70 public void onFillable()
71 {
72 pipe.iterate();
73 }
74
75 protected abstract int read(EndPoint endPoint, ByteBuffer buffer) throws IOException;
76
77 protected abstract void write(EndPoint endPoint, ByteBuffer buffer, Callback callback);
78
79 @Override
80 public String toString()
81 {
82 return String.format("%s[l:%d<=>r:%d]",
83 super.toString(),
84 getEndPoint().getLocalAddress().getPort(),
85 getEndPoint().getRemoteAddress().getPort());
86 }
87
88 private class ProxyIteratingCallback extends IteratingCallback
89 {
90 private ByteBuffer buffer;
91 private int filled;
92
93 @Override
94 protected Action process() throws Exception
95 {
96 buffer = bufferPool.acquire(getInputBufferSize(), true);
97 try
98 {
99 int filled = this.filled = read(getEndPoint(), buffer);
100 if (LOG.isDebugEnabled())
101 LOG.debug("{} filled {} bytes", ProxyConnection.this, filled);
102 if (filled > 0)
103 {
104 write(connection.getEndPoint(), buffer, this);
105 return Action.SCHEDULED;
106 }
107 else if (filled == 0)
108 {
109 bufferPool.release(buffer);
110 fillInterested();
111 return Action.IDLE;
112 }
113 else
114 {
115 bufferPool.release(buffer);
116 connection.getEndPoint().shutdownOutput();
117 return Action.SUCCEEDED;
118 }
119 }
120 catch (IOException x)
121 {
122 if (LOG.isDebugEnabled())
123 LOG.debug(ProxyConnection.this + " could not fill", x);
124 bufferPool.release(buffer);
125 disconnect();
126 return Action.SUCCEEDED;
127 }
128 }
129
130 @Override
131 public void succeeded()
132 {
133 if (LOG.isDebugEnabled())
134 LOG.debug("{} wrote {} bytes", ProxyConnection.this, filled);
135 bufferPool.release(buffer);
136 super.succeeded();
137 }
138
139 @Override
140 protected void onCompleteSuccess()
141 {
142 }
143
144 @Override
145 protected void onCompleteFailure(Throwable x)
146 {
147 if (LOG.isDebugEnabled())
148 LOG.debug(ProxyConnection.this + " failed to write " + filled + " bytes", x);
149 disconnect();
150 }
151
152 private void disconnect()
153 {
154 bufferPool.release(buffer);
155 ProxyConnection.this.close();
156 connection.close();
157 }
158 }
159 }