View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.eclipse.jetty.util.BufferUtil;
29  
30  public class MappedByteBufferPool implements ByteBufferPool
31  {
32      private final ConcurrentMap<Integer, Queue<ByteBuffer>> directBuffers = new ConcurrentHashMap<>();
33      private final ConcurrentMap<Integer, Queue<ByteBuffer>> heapBuffers = new ConcurrentHashMap<>();
34      private final int factor;
35  
36      public MappedByteBufferPool()
37      {
38          this(1024);
39      }
40  
41      public MappedByteBufferPool(int factor)
42      {
43          this.factor = factor;
44      }
45  
46      @Override
47      public ByteBuffer acquire(int size, boolean direct)
48      {
49          int bucket = bucketFor(size);
50          ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(direct);
51  
52          ByteBuffer result = null;
53          Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
54          if (byteBuffers != null)
55              result = byteBuffers.poll();
56  
57          if (result == null)
58          {
59              int capacity = bucket * factor;
60              result = newByteBuffer(capacity, direct);
61          }
62  
63          BufferUtil.clear(result);
64          return result;
65      }
66  
67      protected ByteBuffer newByteBuffer(int capacity, boolean direct)
68      {
69          return direct ? BufferUtil.allocateDirect(capacity)
70                        : BufferUtil.allocate(capacity);
71      }
72  
73      @Override
74      public void release(ByteBuffer buffer)
75      {
76          if (buffer == null)
77              return; // nothing to do
78          
79          // validate that this buffer is from this pool
80          assert((buffer.capacity() % factor) == 0);
81          
82          int bucket = bucketFor(buffer.capacity());
83          ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect());
84  
85          // Avoid to create a new queue every time, just to be discarded immediately
86          Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
87          if (byteBuffers == null)
88          {
89              byteBuffers = new ConcurrentLinkedQueue<>();
90              Queue<ByteBuffer> existing = buffers.putIfAbsent(bucket, byteBuffers);
91              if (existing != null)
92                  byteBuffers = existing;
93          }
94  
95          BufferUtil.clear(buffer);
96          byteBuffers.offer(buffer);
97      }
98  
99      public void clear()
100     {
101         directBuffers.clear();
102         heapBuffers.clear();
103     }
104 
105     private int bucketFor(int size)
106     {
107         int bucket = size / factor;
108         if (size % factor > 0)
109             ++bucket;
110         return bucket;
111     }
112 
113     // Package local for testing
114     ConcurrentMap<Integer, Queue<ByteBuffer>> buffersFor(boolean direct)
115     {
116         return direct ? directBuffers : heapBuffers;
117     }
118 
119     public static class Tagged extends MappedByteBufferPool
120     {
121         private final AtomicInteger tag = new AtomicInteger();
122 
123         @Override
124         protected ByteBuffer newByteBuffer(int capacity, boolean direct)
125         {
126             ByteBuffer buffer = super.newByteBuffer(capacity + 4, direct);
127             buffer.limit(buffer.capacity());
128             buffer.putInt(tag.incrementAndGet());
129             ByteBuffer slice = buffer.slice();
130             BufferUtil.clear(slice);
131             return slice;
132         }
133     }
134 }