View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.Executor;
25  
26  import org.eclipse.jetty.io.AbstractConnection;
27  import org.eclipse.jetty.io.ByteBufferPool;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Logger;
33  
34  public abstract class ProxyConnection extends AbstractConnection
35  {
36      protected static final Logger LOG = ConnectHandler.LOG;
37      private final IteratingCallback pipe = new ProxyIteratingCallback();
38      private final ByteBufferPool bufferPool;
39      private final ConcurrentMap<String, Object> context;
40      private Connection connection;
41  
42      protected ProxyConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
43      {
44          super(endp, executor);
45          this.bufferPool = bufferPool;
46          this.context = context;
47      }
48  
49      public ByteBufferPool getByteBufferPool()
50      {
51          return bufferPool;
52      }
53  
54      public ConcurrentMap<String, Object> getContext()
55      {
56          return context;
57      }
58  
59      public Connection getConnection()
60      {
61          return connection;
62      }
63  
64      public void setConnection(Connection connection)
65      {
66          this.connection = connection;
67      }
68  
69      @Override
70      public void onFillable()
71      {
72          pipe.iterate();
73      }
74  
75      protected abstract int read(EndPoint endPoint, ByteBuffer buffer) throws IOException;
76  
77      protected abstract void write(EndPoint endPoint, ByteBuffer buffer, Callback callback);
78  
79      @Override
80      public String toString()
81      {
82          return String.format("%s[l:%d<=>r:%d]",
83                  super.toString(),
84                  getEndPoint().getLocalAddress().getPort(),
85                  getEndPoint().getRemoteAddress().getPort());
86      }
87  
88      private class ProxyIteratingCallback extends IteratingCallback
89      {
90          private ByteBuffer buffer;
91          private int filled;
92  
93          @Override
94          protected Action process() throws Exception
95          {
96              buffer = bufferPool.acquire(getInputBufferSize(), true);
97              try
98              {
99                  int filled = this.filled = read(getEndPoint(), buffer);
100                 if (LOG.isDebugEnabled())
101                     LOG.debug("{} filled {} bytes", ProxyConnection.this, filled);
102                 if (filled > 0)
103                 {
104                     write(connection.getEndPoint(), buffer, this);
105                     return Action.SCHEDULED;
106                 }
107                 else if (filled == 0)
108                 {
109                     bufferPool.release(buffer);
110                     fillInterested();
111                     return Action.IDLE;
112                 }
113                 else
114                 {
115                     bufferPool.release(buffer);
116                     connection.getEndPoint().shutdownOutput();
117                     return Action.SUCCEEDED;
118                 }
119             }
120             catch (IOException x)
121             {
122                 if (LOG.isDebugEnabled())
123                     LOG.debug(ProxyConnection.this + " could not fill", x);
124                 bufferPool.release(buffer);
125                 disconnect();
126                 return Action.SUCCEEDED;
127             }
128         }
129 
130         @Override
131         public void succeeded()
132         {
133             if (LOG.isDebugEnabled())
134                 LOG.debug("{} wrote {} bytes", ProxyConnection.this, filled);
135             bufferPool.release(buffer);
136             super.succeeded();
137         }
138 
139         @Override
140         protected void onCompleteSuccess()
141         {
142         }
143 
144         @Override
145         protected void onCompleteFailure(Throwable x)
146         {
147             if (LOG.isDebugEnabled())
148                 LOG.debug(ProxyConnection.this + " failed to write " + filled + " bytes", x);
149             disconnect();
150         }
151 
152         private void disconnect()
153         {
154             bufferPool.release(buffer);
155             ProxyConnection.this.close();
156             connection.close();
157         }
158     }
159 }