1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21
22 import org.eclipse.jetty.io.Buffer;
23 import org.eclipse.jetty.io.NetworkTrafficListener;
24 import org.eclipse.jetty.util.log.Log;
25 import org.eclipse.jetty.util.log.Logger;
26
27 public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
28 {
29 private static final Logger LOG = Log.getLogger(NetworkTrafficSelectChannelEndPoint.class);
30
31 private final List<NetworkTrafficListener> listeners;
32
33 public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, int maxIdleTime, List<NetworkTrafficListener> listeners) throws IOException
34 {
35 super(channel, selectSet, key, maxIdleTime);
36 this.listeners = listeners;
37 }
38
39 @Override
40 public int fill(Buffer buffer) throws IOException
41 {
42 int read = super.fill(buffer);
43 notifyIncoming(buffer, read);
44 return read;
45 }
46
47 @Override
48 public int flush(Buffer buffer) throws IOException
49 {
50 int position = buffer.getIndex();
51 int written = super.flush(buffer);
52 notifyOutgoing(buffer, position, written);
53 return written;
54 }
55
56 @Override
57 protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException
58 {
59 int headerPosition = header.getIndex();
60 int headerLength = header.length();
61 int bufferPosition = buffer.getIndex();
62 int written = super.gatheringFlush(header, bbuf0, buffer,bbuf1);
63 notifyOutgoing(header, headerPosition, written > headerLength ? headerLength : written);
64 notifyOutgoing(buffer, bufferPosition, written > headerLength ? written - headerLength : 0);
65 return written;
66 }
67
68 public void notifyOpened()
69 {
70 if (listeners != null && !listeners.isEmpty())
71 {
72 for (NetworkTrafficListener listener : listeners)
73 {
74 try
75 {
76 listener.opened(_socket);
77 }
78 catch (Exception x)
79 {
80 LOG.warn(x);
81 }
82 }
83 }
84 }
85
86 public void notifyIncoming(Buffer buffer, int read)
87 {
88 if (listeners != null && !listeners.isEmpty() && read > 0)
89 {
90 for (NetworkTrafficListener listener : listeners)
91 {
92 try
93 {
94 Buffer view = buffer.asReadOnlyBuffer();
95 listener.incoming(_socket, view);
96 }
97 catch (Exception x)
98 {
99 LOG.warn(x);
100 }
101 }
102 }
103 }
104
105 public void notifyOutgoing(Buffer buffer, int position, int written)
106 {
107 if (listeners != null && !listeners.isEmpty() && written > 0)
108 {
109 for (NetworkTrafficListener listener : listeners)
110 {
111 try
112 {
113 Buffer view = buffer.asReadOnlyBuffer();
114 view.setGetIndex(position);
115 view.setPutIndex(position + written);
116 listener.outgoing(_socket, view);
117 }
118 catch (Exception x)
119 {
120 LOG.warn(x);
121 }
122 }
123 }
124 }
125
126 public void notifyClosed()
127 {
128 if (listeners != null && !listeners.isEmpty())
129 {
130 for (NetworkTrafficListener listener : listeners)
131 {
132 try
133 {
134 listener.closed(_socket);
135 }
136 catch (Exception x)
137 {
138 LOG.warn(x);
139 }
140 }
141 }
142 }
143 }