View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.parser;
20  
21  import java.nio.ByteBuffer;
22  import java.util.EventListener;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  
26  import org.eclipse.jetty.spdy.CompressionFactory;
27  import org.eclipse.jetty.spdy.SessionException;
28  import org.eclipse.jetty.spdy.StreamException;
29  import org.eclipse.jetty.spdy.api.SessionStatus;
30  import org.eclipse.jetty.spdy.frames.ControlFrame;
31  import org.eclipse.jetty.spdy.frames.DataFrame;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  public class Parser
36  {
37      private static final Logger logger = Log.getLogger(Parser.class);
38      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
39      private final ControlFrameParser controlFrameParser;
40      private final DataFrameParser dataFrameParser;
41      private State state = State.CONTROL_BIT;
42  
43      public Parser(CompressionFactory.Decompressor decompressor)
44      {
45          // It is important to allocate one decompression context per
46          // SPDY session for the control frames (to decompress the headers)
47          controlFrameParser = new ControlFrameParser(decompressor)
48          {
49              @Override
50              protected void onControlFrame(ControlFrame frame)
51              {
52                  logger.debug("Parsed {}", frame);
53                  notifyControlFrame(frame);
54              }
55          };
56          dataFrameParser = new DataFrameParser()
57          {
58              @Override
59              protected void onDataFrame(DataFrame frame, ByteBuffer data)
60              {
61                  logger.debug("Parsed {}, {} data bytes", frame, data.remaining());
62                  notifyDataFrame(frame, data);
63              }
64          };
65      }
66  
67      public void addListener(Listener listener)
68      {
69          listeners.add(listener);
70      }
71  
72      public void removeListener(Listener listener)
73      {
74          listeners.remove(listener);
75      }
76  
77      protected void notifyControlFrame(ControlFrame frame)
78      {
79          for (Listener listener : listeners)
80          {
81              try
82              {
83                  listener.onControlFrame(frame);
84              }
85              catch (Exception x)
86              {
87                  logger.info("Exception while notifying listener " + listener, x);
88              }
89          }
90      }
91  
92      protected void notifyDataFrame(DataFrame frame, ByteBuffer data)
93      {
94          for (Listener listener : listeners)
95          {
96              try
97              {
98                  listener.onDataFrame(frame, data);
99              }
100             catch (Exception x)
101             {
102                 logger.info("Exception while notifying listener " + listener, x);
103             }
104         }
105     }
106 
107     protected void notifyStreamException(StreamException x)
108     {
109         for (Listener listener : listeners)
110         {
111             listener.onStreamException(x);
112         }
113     }
114 
115     protected void notifySessionException(SessionException x)
116     {
117         logger.debug("SPDY session exception", x);
118         for (Listener listener : listeners)
119         {
120             try
121             {
122                 listener.onSessionException(x);
123             }
124             catch (Exception xx)
125             {
126                 logger.debug("Could not notify listener " + listener, xx);
127             }
128         }
129     }
130 
131     public void parse(ByteBuffer buffer)
132     {
133         try
134         {
135             logger.debug("Parsing {} bytes", buffer.remaining());
136             while (buffer.hasRemaining())
137             {
138                 switch (state)
139                 {
140                     case CONTROL_BIT:
141                     {
142                         // We must only peek the first byte and not advance the buffer
143                         // because the 7 least significant bits may be relevant in data frames
144                         int currByte = buffer.get(buffer.position());
145                         boolean isControlFrame = (currByte & 0x80) == 0x80;
146                         state = isControlFrame ? State.CONTROL_FRAME : State.DATA_FRAME;
147                         break;
148                     }
149                     case CONTROL_FRAME:
150                     {
151                         if (controlFrameParser.parse(buffer))
152                             reset();
153                         break;
154                     }
155                     case DATA_FRAME:
156                     {
157                         if (dataFrameParser.parse(buffer))
158                             reset();
159                         break;
160                     }
161                     default:
162                     {
163                         throw new IllegalStateException();
164                     }
165                 }
166             }
167         }
168         catch (SessionException x)
169         {
170             notifySessionException(x);
171         }
172         catch (StreamException x)
173         {
174             notifyStreamException(x);
175         }
176         catch (Throwable x)
177         {
178             notifySessionException(new SessionException(SessionStatus.PROTOCOL_ERROR, x));
179         }
180         finally
181         {
182             // Be sure to consume after exceptions
183             buffer.position(buffer.limit());
184         }
185     }
186 
187     private void reset()
188     {
189         state = State.CONTROL_BIT;
190     }
191 
192     public interface Listener extends EventListener
193     {
194         public void onControlFrame(ControlFrame frame);
195 
196         public void onDataFrame(DataFrame frame, ByteBuffer data);
197 
198         public void onStreamException(StreamException x);
199 
200         public void onSessionException(SessionException x);
201 
202         public static class Adapter implements Listener
203         {
204             @Override
205             public void onControlFrame(ControlFrame frame)
206             {
207             }
208 
209             @Override
210             public void onDataFrame(DataFrame frame, ByteBuffer data)
211             {
212             }
213 
214             @Override
215             public void onStreamException(StreamException x)
216             {
217             }
218 
219             @Override
220             public void onSessionException(SessionException x)
221             {
222             }
223         }
224     }
225 
226     private enum State
227     {
228         CONTROL_BIT, CONTROL_FRAME, DATA_FRAME
229     }
230 
231 }