View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.Executor;
25  
26  import org.eclipse.jetty.io.AbstractConnection;
27  import org.eclipse.jetty.io.ByteBufferPool;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.ForkInvoker;
32  import org.eclipse.jetty.util.log.Logger;
33  
34  public abstract class ProxyConnection extends AbstractConnection
35  {
36      protected static final Logger LOG = ConnectHandler.LOG;
37      private final ForkInvoker<Void> invoker = new ProxyForkInvoker();
38      private final ByteBufferPool bufferPool;
39      private final ConcurrentMap<String, Object> context;
40      private Connection connection;
41  
42      protected ProxyConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
43      {
44          super(endp, executor);
45          this.bufferPool = bufferPool;
46          this.context = context;
47      }
48  
49      public ByteBufferPool getByteBufferPool()
50      {
51          return bufferPool;
52      }
53  
54      public ConcurrentMap<String, Object> getContext()
55      {
56          return context;
57      }
58  
59      public Connection getConnection()
60      {
61          return connection;
62      }
63  
64      public void setConnection(Connection connection)
65      {
66          this.connection = connection;
67      }
68  
69      @Override
70      public void onFillable()
71      {
72          final ByteBuffer buffer = getByteBufferPool().acquire(getInputBufferSize(), true);
73          try
74          {
75              final int filled = read(getEndPoint(), buffer);
76              LOG.debug("{} filled {} bytes", this, filled);
77              if (filled > 0)
78              {
79                  write(getConnection().getEndPoint(), buffer, new Callback()
80                  {
81                      @Override
82                      public void succeeded()
83                      {
84                          LOG.debug("{} wrote {} bytes", this, filled);
85                          bufferPool.release(buffer);
86                          invoker.invoke(null);
87                      }
88  
89                      @Override
90                      public void failed(Throwable x)
91                      {
92                          LOG.debug(this + " failed to write " + filled + " bytes", x);
93                          bufferPool.release(buffer);
94                          connection.close();
95                      }
96                  });
97              }
98              else if (filled == 0)
99              {
100                 bufferPool.release(buffer);
101                 fillInterested();
102             }
103             else
104             {
105                 bufferPool.release(buffer);
106                 connection.getEndPoint().shutdownOutput();
107             }
108         }
109         catch (IOException x)
110         {
111             LOG.debug(this + " could not fill", x);
112             bufferPool.release(buffer);
113             close();
114             connection.close();
115         }
116     }
117 
118     protected abstract int read(EndPoint endPoint, ByteBuffer buffer) throws IOException;
119 
120     protected abstract void write(EndPoint endPoint, ByteBuffer buffer, Callback callback);
121 
122     @Override
123     public String toString()
124     {
125         return String.format("%s[l:%d<=>r:%d]",
126                 super.toString(),
127                 getEndPoint().getLocalAddress().getPort(),
128                 getEndPoint().getRemoteAddress().getPort());
129     }
130 
131     private class ProxyForkInvoker extends ForkInvoker<Void> implements Runnable
132     {
133         private ProxyForkInvoker()
134         {
135             super(4);
136         }
137 
138         @Override
139         public void fork(Void arg)
140         {
141             getExecutor().execute(this);
142         }
143         
144         @Override
145         public void run()
146         {
147             onFillable();
148         }
149 
150         @Override
151         public void call(Void arg)
152         {
153             onFillable();
154         }
155     }
156 }