View Javadoc

1   // ========================================================================
2   // Copyright (c) 2008-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.session;
15  
16  import java.io.ByteArrayInputStream;
17  import java.io.InputStream;
18  import java.sql.Blob;
19  import java.sql.Connection;
20  import java.sql.DatabaseMetaData;
21  import java.sql.Driver;
22  import java.sql.DriverManager;
23  import java.sql.PreparedStatement;
24  import java.sql.ResultSet;
25  import java.sql.SQLException;
26  import java.sql.Statement;
27  import java.util.ArrayList;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Random;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  
34  import javax.naming.InitialContext;
35  import javax.servlet.http.HttpServletRequest;
36  import javax.servlet.http.HttpSession;
37  import javax.sql.DataSource;
38  
39  import org.eclipse.jetty.server.Handler;
40  import org.eclipse.jetty.server.Server;
41  import org.eclipse.jetty.server.SessionManager;
42  import org.eclipse.jetty.server.handler.ContextHandler;
43  import org.eclipse.jetty.util.log.Logger;
44  
45  
46  
47  /**
48   * JDBCSessionIdManager
49   *
50   * SessionIdManager implementation that uses a database to store in-use session ids, 
51   * to support distributed sessions.
52   * 
53   */
54  public class JDBCSessionIdManager extends AbstractSessionIdManager
55  {    
56      final static Logger LOG = SessionHandler.__log;
57      
58      protected final HashSet<String> _sessionIds = new HashSet<String>();
59      protected Server _server;
60      protected Driver _driver;
61      protected String _driverClassName;
62      protected String _connectionUrl;
63      protected DataSource _datasource;
64      protected String _jndiName;
65      protected String _sessionIdTable = "JettySessionIds";
66      protected String _sessionTable = "JettySessions";
67      protected String _sessionTableRowId = "rowId";
68      
69      protected Timer _timer; //scavenge timer
70      protected TimerTask _task; //scavenge task
71      protected long _lastScavengeTime;
72      protected long _scavengeIntervalMs = 1000 * 60 * 10; //10mins
73      protected String _blobType; //if not set, is deduced from the type of the database at runtime
74      
75      protected String _createSessionIdTable;
76      protected String _createSessionTable;
77                                              
78      protected String _selectExpiredSessions;
79      protected String _deleteOldExpiredSessions;
80  
81      protected String _insertId;
82      protected String _deleteId;
83      protected String _queryId;
84      
85      protected DatabaseAdaptor _dbAdaptor;
86  
87      
88      /**
89       * DatabaseAdaptor
90       *
91       * Handles differences between databases.
92       * 
93       * Postgres uses the getBytes and setBinaryStream methods to access
94       * a "bytea" datatype, which can be up to 1Gb of binary data. MySQL
95       * is happy to use the "blob" type and getBlob() methods instead.
96       * 
97       * TODO if the differences become more major it would be worthwhile
98       * refactoring this class.
99       */
100     public class DatabaseAdaptor 
101     {
102         String _dbName;
103         boolean _isLower;
104         boolean _isUpper;
105         
106         
107         public DatabaseAdaptor (DatabaseMetaData dbMeta)
108         throws SQLException
109         {
110             _dbName = dbMeta.getDatabaseProductName().toLowerCase(); 
111             LOG.debug ("Using database "+_dbName);
112             _isLower = dbMeta.storesLowerCaseIdentifiers();
113             _isUpper = dbMeta.storesUpperCaseIdentifiers();
114         }
115         
116         /**
117          * Convert a camel case identifier into either upper or lower
118          * depending on the way the db stores identifiers.
119          * 
120          * @param identifier
121          * @return the converted identifier
122          */
123         public String convertIdentifier (String identifier)
124         {
125             if (_isLower)
126                 return identifier.toLowerCase();
127             if (_isUpper)
128                 return identifier.toUpperCase();
129             
130             return identifier;
131         }
132         
133         public String getDBName ()
134         {
135             return _dbName;
136         }
137         
138         public String getBlobType ()
139         {
140             if (_blobType != null)
141                 return _blobType;
142             
143             if (_dbName.startsWith("postgres"))
144                 return "bytea";
145             
146             return "blob";
147         }
148         
149         public InputStream getBlobInputStream (ResultSet result, String columnName)
150         throws SQLException
151         {
152             if (_dbName.startsWith("postgres"))
153             {
154                 byte[] bytes = result.getBytes(columnName);
155                 return new ByteArrayInputStream(bytes);
156             }
157             
158             Blob blob = result.getBlob(columnName);
159             return blob.getBinaryStream();
160         }
161     }
162     
163     
164     
165     public JDBCSessionIdManager(Server server)
166     {
167         super();
168         _server=server;
169     }
170     
171     public JDBCSessionIdManager(Server server, Random random)
172     {
173        super(random);
174        _server=server;
175     }
176 
177     /**
178      * Configure jdbc connection information via a jdbc Driver
179      * 
180      * @param driverClassName
181      * @param connectionUrl
182      */
183     public void setDriverInfo (String driverClassName, String connectionUrl)
184     {
185         _driverClassName=driverClassName;
186         _connectionUrl=connectionUrl;
187     }
188     
189     /**
190      * Configure jdbc connection information via a jdbc Driver
191      * 
192      * @param driverClass
193      * @param connectionUrl
194      */
195     public void setDriverInfo (Driver driverClass, String connectionUrl)
196     {
197         _driver=driverClass;
198         _connectionUrl=connectionUrl;
199     }
200     
201     
202     public String getDriverClassName()
203     {
204         return _driverClassName;
205     }
206     
207     public String getConnectionUrl ()
208     {
209         return _connectionUrl;
210     }
211     
212     public void setDatasourceName (String jndi)
213     {
214         _jndiName=jndi;
215     }
216     
217     public String getDatasourceName ()
218     {
219         return _jndiName;
220     }
221    
222     public void setBlobType (String name)
223     {
224         _blobType = name;
225     }
226     
227     public String getBlobType ()
228     {
229         return _blobType;
230     }
231     
232     public void setScavengeInterval (long sec)
233     {
234         if (sec<=0)
235             sec=60;
236 
237         long old_period=_scavengeIntervalMs;
238         long period=sec*1000;
239       
240         _scavengeIntervalMs=period;
241         
242         //add a bit of variability into the scavenge time so that not all
243         //nodes with the same scavenge time sync up
244         long tenPercent = _scavengeIntervalMs/10;
245         if ((System.currentTimeMillis()%2) == 0)
246             _scavengeIntervalMs += tenPercent;
247         
248         if (LOG.isDebugEnabled()) LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
249         if (_timer!=null && (period!=old_period || _task==null))
250         {
251             synchronized (this)
252             {
253                 if (_task!=null)
254                     _task.cancel();
255                 _task = new TimerTask()
256                 {
257                     @Override
258                     public void run()
259                     {
260                         scavenge();
261                     }   
262                 };
263                 _timer.schedule(_task,_scavengeIntervalMs,_scavengeIntervalMs);
264             }
265         }  
266     }
267     
268     public long getScavengeInterval ()
269     {
270         return _scavengeIntervalMs/1000;
271     }
272     
273     
274     public void addSession(HttpSession session)
275     {
276         if (session == null)
277             return;
278         
279         synchronized (_sessionIds)
280         {
281             String id = ((JDBCSessionManager.Session)session).getClusterId();            
282             try
283             {
284                 insert(id);
285                 _sessionIds.add(id);
286             }
287             catch (Exception e)
288             {
289                 LOG.warn("Problem storing session id="+id, e);
290             }
291         }
292     }
293     
294     public void removeSession(HttpSession session)
295     {
296         if (session == null)
297             return;
298         
299         removeSession(((JDBCSessionManager.Session)session).getClusterId());
300     }
301     
302     
303     
304     public void removeSession (String id)
305     {
306 
307         if (id == null)
308             return;
309         
310         synchronized (_sessionIds)
311         {  
312             if (LOG.isDebugEnabled())
313                 LOG.debug("Removing session id="+id);
314             try
315             {               
316                 _sessionIds.remove(id);
317                 delete(id);
318             }
319             catch (Exception e)
320             {
321                 LOG.warn("Problem removing session id="+id, e);
322             }
323         }
324         
325     }
326     
327 
328     /** 
329      * Get the session id without any node identifier suffix.
330      * 
331      * @see org.eclipse.jetty.server.SessionIdManager#getClusterId(java.lang.String)
332      */
333     public String getClusterId(String nodeId)
334     {
335         int dot=nodeId.lastIndexOf('.');
336         return (dot>0)?nodeId.substring(0,dot):nodeId;
337     }
338     
339 
340     /** 
341      * Get the session id, including this node's id as a suffix.
342      * 
343      * @see org.eclipse.jetty.server.SessionIdManager#getNodeId(java.lang.String, javax.servlet.http.HttpServletRequest)
344      */
345     public String getNodeId(String clusterId, HttpServletRequest request)
346     {
347         if (_workerName!=null)
348             return clusterId+'.'+_workerName;
349 
350         return clusterId;
351     }
352 
353 
354     public boolean idInUse(String id)
355     {
356         if (id == null)
357             return false;
358         
359         String clusterId = getClusterId(id);
360         boolean inUse = false;
361         synchronized (_sessionIds)
362         {
363             inUse = _sessionIds.contains(clusterId);
364         }
365         
366         if (inUse)
367             return true; //optimisation - if this session is one we've been managing, we can check locally
368 
369         //otherwise, we need to go to the database to check
370         try
371         {
372             return exists(clusterId);
373         }
374         catch (Exception e)
375         {
376             LOG.warn("Problem checking inUse for id="+clusterId, e);
377             return false;
378         }
379     }
380 
381     /** 
382      * Invalidate the session matching the id on all contexts.
383      * 
384      * @see org.eclipse.jetty.server.SessionIdManager#invalidateAll(java.lang.String)
385      */
386     public void invalidateAll(String id)
387     {            
388         //take the id out of the list of known sessionids for this node
389         removeSession(id);
390         
391         synchronized (_sessionIds)
392         {
393             //tell all contexts that may have a session object with this id to
394             //get rid of them
395             Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class);
396             for (int i=0; contexts!=null && i<contexts.length; i++)
397             {
398                 SessionHandler sessionHandler = (SessionHandler)((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class);
399                 if (sessionHandler != null) 
400                 {
401                     SessionManager manager = sessionHandler.getSessionManager();
402 
403                     if (manager != null && manager instanceof JDBCSessionManager)
404                     {
405                         ((JDBCSessionManager)manager).invalidateSession(id);
406                     }
407                 }
408             }
409         }
410     }
411 
412 
413     /** 
414      * Start up the id manager.
415      * 
416      * Makes necessary database tables and starts a Session
417      * scavenger thread.
418      */
419     @Override
420     public void doStart()
421     {
422         try
423         {            
424             initializeDatabase();
425             prepareTables();        
426             super.doStart();
427             if (LOG.isDebugEnabled()) LOG.debug("Scavenging interval = "+getScavengeInterval()+" sec");
428             _timer=new Timer("JDBCSessionScavenger", true);
429             setScavengeInterval(getScavengeInterval());
430         }
431         catch (Exception e)
432         {
433             LOG.warn("Problem initialising JettySessionIds table", e);
434         }
435     }
436     
437     /** 
438      * Stop the scavenger.
439      */
440     @Override
441     public void doStop () 
442     throws Exception
443     {
444         synchronized(this)
445         {
446             if (_task!=null)
447                 _task.cancel();
448             if (_timer!=null)
449                 _timer.cancel();
450             _timer=null;
451         }
452         super.doStop();
453     }
454   
455     /**
456      * Get a connection from the driver or datasource.
457      * 
458      * @return the connection for the datasource
459      * @throws SQLException
460      */
461     protected Connection getConnection ()
462     throws SQLException
463     {
464         if (_datasource != null)
465             return _datasource.getConnection();
466         else
467             return DriverManager.getConnection(_connectionUrl);
468     }
469 
470     
471     private void initializeDatabase ()
472     throws Exception
473     {
474         if (_jndiName!=null)
475         {
476             InitialContext ic = new InitialContext();
477             _datasource = (DataSource)ic.lookup(_jndiName);
478         }
479         else if ( _driver != null && _connectionUrl != null )
480         {
481             DriverManager.registerDriver(_driver);
482         }
483         else if (_driverClassName != null && _connectionUrl != null)
484         {
485             Class.forName(_driverClassName);
486         }
487         else
488             throw new IllegalStateException("No database configured for sessions");
489     }
490     
491     
492     
493     /**
494      * Set up the tables in the database
495      * @throws SQLException
496      */
497     private void prepareTables()
498     throws SQLException
499     {
500         _createSessionIdTable = "create table "+_sessionIdTable+" (id varchar(120), primary key(id))";
501         _selectExpiredSessions = "select * from "+_sessionTable+" where expiryTime >= ? and expiryTime <= ?";
502         _deleteOldExpiredSessions = "delete from "+_sessionTable+" where expiryTime >0 and expiryTime <= ?";
503 
504         _insertId = "insert into "+_sessionIdTable+" (id)  values (?)";
505         _deleteId = "delete from "+_sessionIdTable+" where id = ?";
506         _queryId = "select * from "+_sessionIdTable+" where id = ?";
507 
508         Connection connection = null;
509         try
510         {
511             //make the id table
512             connection = getConnection();
513             connection.setAutoCommit(true);
514             DatabaseMetaData metaData = connection.getMetaData();
515             _dbAdaptor = new DatabaseAdaptor(metaData);
516             _sessionTableRowId = (_dbAdaptor.getDBName() != null && _dbAdaptor.getDBName().contains("oracle") ? "srowId":_sessionTableRowId);
517 
518             //checking for table existence is case-sensitive, but table creation is not
519             String tableName = _dbAdaptor.convertIdentifier(_sessionIdTable);
520             ResultSet result = metaData.getTables(null, null, tableName, null);
521             if (!result.next())
522             {
523                 //table does not exist, so create it
524                 connection.createStatement().executeUpdate(_createSessionIdTable);
525             }
526             
527             //make the session table if necessary
528             tableName = _dbAdaptor.convertIdentifier(_sessionTable);   
529             result = metaData.getTables(null, null, tableName, null);
530             if (!result.next())
531             {
532                 //table does not exist, so create it
533                 String blobType = _dbAdaptor.getBlobType();
534                 _createSessionTable = "create table "+_sessionTable+" ("+_sessionTableRowId+" varchar(120), sessionId varchar(120), "+
535                                            " contextPath varchar(60), virtualHost varchar(60), lastNode varchar(60), accessTime bigint, "+
536                                            " lastAccessTime bigint, createTime bigint, cookieTime bigint, "+
537                                            " lastSavedTime bigint, expiryTime bigint, map "+blobType+", primary key("+_sessionTableRowId+"))";
538                 connection.createStatement().executeUpdate(_createSessionTable);
539             }
540             
541             //make some indexes on the JettySessions table
542             String index1 = "idx_"+_sessionTable+"_expiry";
543             String index2 = "idx_"+_sessionTable+"_session";
544             
545             result = metaData.getIndexInfo(null, null, tableName, false, false);
546             boolean index1Exists = false;
547             boolean index2Exists = false;
548             while (result.next())
549             {
550                 String idxName = result.getString("INDEX_NAME");
551                 if (index1.equalsIgnoreCase(idxName))
552                     index1Exists = true;
553                 else if (index2.equalsIgnoreCase(idxName))
554                     index2Exists = true;
555             }
556             if (!(index1Exists && index2Exists))
557             {
558                 Statement statement = connection.createStatement();
559                 if (!index1Exists)
560                     statement.executeUpdate("create index "+index1+" on "+_sessionTable+" (expiryTime)");
561                 if (!index2Exists)
562                     statement.executeUpdate("create index "+index2+" on "+_sessionTable+" (sessionId, contextPath)");
563             }
564         }
565         finally
566         {
567             if (connection != null)
568                 connection.close();
569         }
570     }
571     
572     /**
573      * Insert a new used session id into the table.
574      * 
575      * @param id
576      * @throws SQLException
577      */
578     private void insert (String id)
579     throws SQLException 
580     {
581         Connection connection = null;
582         try
583         {
584             connection = getConnection();
585             connection.setAutoCommit(true);            
586             PreparedStatement query = connection.prepareStatement(_queryId);
587             query.setString(1, id);
588             ResultSet result = query.executeQuery();
589             //only insert the id if it isn't in the db already 
590             if (!result.next())
591             {
592                 PreparedStatement statement = connection.prepareStatement(_insertId);
593                 statement.setString(1, id);
594                 statement.executeUpdate();
595             }
596         }
597         finally
598         {
599             if (connection != null)
600                 connection.close();
601         }
602     }
603     
604     /**
605      * Remove a session id from the table.
606      * 
607      * @param id
608      * @throws SQLException
609      */
610     private void delete (String id)
611     throws SQLException
612     {
613         Connection connection = null;
614         try
615         {
616             connection = getConnection();
617             connection.setAutoCommit(true);
618             PreparedStatement statement = connection.prepareStatement(_deleteId);
619             statement.setString(1, id);
620             statement.executeUpdate();
621         }
622         finally
623         {
624             if (connection != null)
625                 connection.close();
626         }
627     }
628     
629     
630     /**
631      * Check if a session id exists.
632      * 
633      * @param id
634      * @return
635      * @throws SQLException
636      */
637     private boolean exists (String id)
638     throws SQLException
639     {
640         Connection connection = null;
641         try
642         {
643             connection = getConnection();
644             connection.setAutoCommit(true);
645             PreparedStatement statement = connection.prepareStatement(_queryId);
646             statement.setString(1, id);
647             ResultSet result = statement.executeQuery();
648             return result.next();
649         }
650         finally
651         {
652             if (connection != null)
653                 connection.close();
654         }
655     }
656     
657     /**
658      * Look for sessions in the database that have expired.
659      * 
660      * We do this in the SessionIdManager and not the SessionManager so
661      * that we only have 1 scavenger, otherwise if there are n SessionManagers
662      * there would be n scavengers, all contending for the database.
663      * 
664      * We look first for sessions that expired in the previous interval, then
665      * for sessions that expired previously - these are old sessions that no
666      * node is managing any more and have become stuck in the database.
667      */
668     private void scavenge ()
669     {
670         Connection connection = null;
671         List<String> expiredSessionIds = new ArrayList<String>();
672         try
673         {            
674             if (LOG.isDebugEnabled()) LOG.debug("Scavenge sweep started at "+System.currentTimeMillis());
675             if (_lastScavengeTime > 0)
676             {
677                 connection = getConnection();
678                 connection.setAutoCommit(true);
679                 //"select sessionId from JettySessions where expiryTime > (lastScavengeTime - scanInterval) and expiryTime < lastScavengeTime";
680                 PreparedStatement statement = connection.prepareStatement(_selectExpiredSessions);
681                 long lowerBound = (_lastScavengeTime - _scavengeIntervalMs);
682                 long upperBound = _lastScavengeTime;
683                 if (LOG.isDebugEnabled()) LOG.debug (" Searching for sessions expired between "+lowerBound + " and "+upperBound);
684                 
685                 statement.setLong(1, lowerBound);
686                 statement.setLong(2, upperBound);
687                 ResultSet result = statement.executeQuery();
688                 while (result.next())
689                 {
690                     String sessionId = result.getString("sessionId");
691                     expiredSessionIds.add(sessionId);
692                     if (LOG.isDebugEnabled()) LOG.debug (" Found expired sessionId="+sessionId); 
693                 }
694 
695                 //tell the SessionManagers to expire any sessions with a matching sessionId in memory
696                 Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class);
697                 for (int i=0; contexts!=null && i<contexts.length; i++)
698                 {
699 
700                     SessionHandler sessionHandler = (SessionHandler)((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class);
701                     if (sessionHandler != null) 
702                     { 
703                         SessionManager manager = sessionHandler.getSessionManager();
704                         if (manager != null && manager instanceof JDBCSessionManager)
705                         {
706                             ((JDBCSessionManager)manager).expire(expiredSessionIds);
707                         }
708                     }
709                 }
710 
711                 //find all sessions that have expired at least a couple of scanIntervals ago and just delete them
712                 upperBound = _lastScavengeTime - (2 * _scavengeIntervalMs);
713                 if (upperBound > 0)
714                 {
715                     if (LOG.isDebugEnabled()) LOG.debug("Deleting old expired sessions expired before "+upperBound);
716                     statement = connection.prepareStatement(_deleteOldExpiredSessions);
717                     statement.setLong(1, upperBound);
718                     statement.executeUpdate();
719                 }
720             }
721         }
722         catch (Exception e)
723         {
724             if (isRunning())    
725                 LOG.warn("Problem selecting expired sessions", e);
726             else
727                 LOG.ignore(e);
728         }
729         finally
730         {           
731             _lastScavengeTime=System.currentTimeMillis();
732             if (LOG.isDebugEnabled()) LOG.debug("Scavenge sweep ended at "+_lastScavengeTime);
733             if (connection != null)
734             {
735                 try
736                 {
737                 connection.close();
738                 }
739                 catch (SQLException e)
740                 {
741                     LOG.warn(e);
742                 }
743             }
744         }
745     }
746 }