View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.gcloud.session;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.ObjectOutputStream;
24  import java.io.Serializable;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicReference;
33  import java.util.concurrent.locks.ReentrantLock;
34  
35  import javax.servlet.http.HttpServletRequest;
36  
37  import org.eclipse.jetty.server.handler.ContextHandler;
38  import org.eclipse.jetty.server.handler.ContextHandler.Context;
39  import org.eclipse.jetty.server.session.AbstractSession;
40  import org.eclipse.jetty.server.session.AbstractSessionManager;
41  import org.eclipse.jetty.server.session.MemSession;
42  import org.eclipse.jetty.util.ClassLoadingObjectInputStream;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
46  import org.eclipse.jetty.util.thread.Scheduler;
47  
48  import com.google.gcloud.datastore.Blob;
49  import com.google.gcloud.datastore.Datastore;
50  import com.google.gcloud.datastore.DatastoreFactory;
51  import com.google.gcloud.datastore.Entity;
52  import com.google.gcloud.datastore.GqlQuery;
53  import com.google.gcloud.datastore.Key;
54  import com.google.gcloud.datastore.KeyFactory;
55  import com.google.gcloud.datastore.Query;
56  import com.google.gcloud.datastore.Query.ResultType;
57  import com.google.gcloud.datastore.QueryResults;
58  
59  
60  
61  /**
62   * GCloudSessionManager
63   * 
64   * 
65   */
66  public class GCloudSessionManager extends AbstractSessionManager
67  {
68      private  final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
69      
70      
71      public static final String KIND = "GCloudSession";
72      public static final int DEFAULT_MAX_QUERY_RESULTS = 100;
73      public static final long DEFAULT_SCAVENGE_SEC = 600; 
74      
75      /**
76       * Sessions known to this node held in memory
77       */
78      private ConcurrentHashMap<String, GCloudSessionManager.Session> _sessions;
79  
80      
81      /**
82       * The length of time a session can be in memory without being checked against
83       * the cluster. A value of 0 indicates that the session is never checked against
84       * the cluster - the current node is considered to be the master for the session.
85       *
86       */
87      private long _staleIntervalSec = 0;
88      
89      protected Scheduler.Task _task; //scavenge task
90      protected Scheduler _scheduler;
91      protected Scavenger _scavenger;
92      protected long _scavengeIntervalMs = 1000L * DEFAULT_SCAVENGE_SEC; //10mins
93      protected boolean _ownScheduler;
94      
95      private Datastore _datastore;
96      private KeyFactory _keyFactory;
97  
98  
99      private SessionEntityConverter _converter;
100 
101 
102     private int _maxResults = DEFAULT_MAX_QUERY_RESULTS;
103 
104 
105     /**
106      * Scavenger
107      *
108      */
109     protected class Scavenger implements Runnable
110     {
111 
112         @Override
113         public void run()
114         {
115            try
116            {
117                scavenge();
118            }
119            finally
120            {
121                if (_scheduler != null && _scheduler.isRunning())
122                    _task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
123            }
124         }
125     }
126 
127     /**
128      * SessionEntityConverter
129      *
130      *
131      */
132     public class SessionEntityConverter
133     {
134         public  final String CLUSTERID = "clusterId";
135         public  final String CONTEXTPATH = "contextPath";
136         public  final String VHOST = "vhost";
137         public  final String ACCESSED = "accessed";
138         public  final String LASTACCESSED = "lastAccessed";
139         public  final String CREATETIME = "createTime";
140         public  final  String COOKIESETTIME = "cookieSetTime";
141         public  final String LASTNODE = "lastNode";
142         public  final String EXPIRY = "expiry";
143         public  final  String MAXINACTIVE = "maxInactive";
144         public  final  String ATTRIBUTES = "attributes";
145 
146       
147         
148         public Entity entityFromSession (Session session, Key key) throws Exception
149         {
150             if (session == null)
151                 return null;
152             
153             Entity entity = null;
154             
155             //serialize the attribute map
156             ByteArrayOutputStream baos = new ByteArrayOutputStream();
157             ObjectOutputStream oos = new ObjectOutputStream(baos);
158             oos.writeObject(session.getAttributeMap());
159             oos.flush();
160             
161             //turn a session into an entity
162             entity = Entity.builder(key)
163                     .set(CLUSTERID, session.getId())
164                     .set(CONTEXTPATH, session.getContextPath())
165                     .set(VHOST, session.getVHost())
166                     .set(ACCESSED, session.getAccessed())
167                     .set(LASTACCESSED, session.getLastAccessedTime())
168                     .set(CREATETIME, session.getCreationTime())
169                     .set(COOKIESETTIME, session.getCookieSetTime())
170                     .set(LASTNODE,session.getLastNode())
171                     .set(EXPIRY, session.getExpiry())
172                     .set(MAXINACTIVE, session.getMaxInactiveInterval())
173                     .set(ATTRIBUTES, Blob.copyFrom(baos.toByteArray())).build();
174                      
175             return entity;
176         }
177         
178         public Session sessionFromEntity (Entity entity) throws Exception
179         {
180             if (entity == null)
181                 return null;
182 
183             final AtomicReference<Session> reference = new AtomicReference<Session>();
184             final AtomicReference<Exception> exception = new AtomicReference<Exception>();
185             Runnable load = new Runnable()
186             {
187                 public void run ()
188                 {
189                     try
190                     {
191                         //turn an entity into a Session
192                         String clusterId = entity.getString(CLUSTERID);
193                         String contextPath = entity.getString(CONTEXTPATH);
194                         String vhost = entity.getString(VHOST);
195                         long accessed = entity.getLong(ACCESSED);
196                         long lastAccessed = entity.getLong(LASTACCESSED);
197                         long createTime = entity.getLong(CREATETIME);
198                         long cookieSetTime = entity.getLong(COOKIESETTIME);
199                         String lastNode = entity.getString(LASTNODE);
200                         long expiry = entity.getLong(EXPIRY);
201                         long maxInactive = entity.getLong(MAXINACTIVE);
202                         Blob blob = (Blob) entity.getBlob(ATTRIBUTES);
203 
204                         Session session = new Session (clusterId, createTime, accessed, maxInactive);
205                         session.setLastNode(lastNode);
206                         session.setContextPath(contextPath);
207                         session.setVHost(vhost);
208                         session.setCookieSetTime(cookieSetTime);
209                         session.setLastAccessedTime(lastAccessed);
210                         session.setLastNode(lastNode);
211                         session.setExpiry(expiry);
212                         try (ClassLoadingObjectInputStream ois = new ClassLoadingObjectInputStream(blob.asInputStream()))
213                         {
214                             Object o = ois.readObject();
215                             session.addAttributes((Map<String,Object>)o);
216                         }
217                         reference.set(session);
218                     }
219                     catch (Exception e)
220                     {
221                         exception.set(e);
222                     }
223                 }
224             };
225             
226             if (_context==null)
227                 load.run();
228             else
229                 _context.getContextHandler().handle(null,load);
230    
231            
232             if (exception.get() != null)
233             {
234                 exception.get().printStackTrace();
235                 throw exception.get();
236             }
237             
238             return reference.get();
239         }
240     }
241     
242     /*
243      * Every time a Session is put into the cache one of these objects
244      * is created to copy the data out of the in-memory session, and 
245      * every time an object is read from the cache one of these objects
246      * a fresh Session object is created based on the data held by this
247      * object.
248      */
249     public class SerializableSessionData implements Serializable
250     {
251         /**
252          * 
253          */
254         private static final long serialVersionUID = -7779120106058533486L;
255         String clusterId;
256         String contextPath;
257         String vhost;
258         long accessed;
259         long lastAccessed;
260         long createTime;
261         long cookieSetTime;
262         String lastNode;
263         long expiry;
264         long maxInactive;
265         Map<String, Object> attributes;
266 
267         public SerializableSessionData()
268         {
269 
270         }
271 
272        
273        public SerializableSessionData(Session s)
274        {
275            clusterId = s.getClusterId();
276            contextPath = s.getContextPath();
277            vhost = s.getVHost();
278            accessed = s.getAccessed();
279            lastAccessed = s.getLastAccessedTime();
280            createTime = s.getCreationTime();
281            cookieSetTime = s.getCookieSetTime();
282            lastNode = s.getLastNode();
283            expiry = s.getExpiry();
284            maxInactive = s.getMaxInactiveInterval();
285            attributes = s.getAttributeMap(); // TODO pointer, not a copy
286        }
287         
288         private void writeObject(java.io.ObjectOutputStream out) throws IOException
289         {  
290             out.writeUTF(clusterId); //session id
291             out.writeUTF(contextPath); //context path
292             out.writeUTF(vhost); //first vhost
293 
294             out.writeLong(accessed);//accessTime
295             out.writeLong(lastAccessed); //lastAccessTime
296             out.writeLong(createTime); //time created
297             out.writeLong(cookieSetTime);//time cookie was set
298             out.writeUTF(lastNode); //name of last node managing
299       
300             out.writeLong(expiry); 
301             out.writeLong(maxInactive);
302             out.writeObject(attributes);
303         }
304         
305         private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException
306         {
307             clusterId = in.readUTF();
308             contextPath = in.readUTF();
309             vhost = in.readUTF();
310             
311             accessed = in.readLong();//accessTime
312             lastAccessed = in.readLong(); //lastAccessTime
313             createTime = in.readLong(); //time created
314             cookieSetTime = in.readLong();//time cookie was set
315             lastNode = in.readUTF(); //last managing node
316             expiry = in.readLong(); 
317             maxInactive = in.readLong();
318             attributes = (HashMap<String,Object>)in.readObject();
319         }
320         
321     }
322     
323 
324     
325     /**
326      * Session
327      *
328      * Representation of a session in local memory.
329      */
330     public class Session extends MemSession
331     {
332         
333         private ReentrantLock _lock = new ReentrantLock();
334         
335         /**
336          * The (canonical) context path for with which this session is associated
337          */
338         private String _contextPath;
339         
340         
341         
342         /**
343          * The time in msec since the epoch at which this session should expire
344          */
345         private long _expiryTime; 
346         
347         
348         /**
349          * Time in msec since the epoch at which this session was last read from cluster
350          */
351         private long _lastSyncTime;
352         
353         
354         /**
355          * The workername of last node known to be managing the session
356          */
357         private String _lastNode;
358         
359         
360         /**
361          * If dirty, session needs to be (re)sent to cluster
362          */
363         protected boolean _dirty=false;
364         
365         
366      
367 
368         /**
369          * Any virtual hosts for the context with which this session is associated
370          */
371         private String _vhost;
372 
373         
374         /**
375          * Count of how many threads are active in this session
376          */
377         private AtomicInteger _activeThreads = new AtomicInteger(0);
378         
379         
380         
381         
382         /**
383          * A new session.
384          * 
385          * @param request
386          */
387         protected Session (HttpServletRequest request)
388         {
389             super(GCloudSessionManager.this,request);
390             long maxInterval = getMaxInactiveInterval();
391             _expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
392             _lastNode = getSessionIdManager().getWorkerName();
393            setVHost(GCloudSessionManager.getVirtualHost(_context));
394            setContextPath(GCloudSessionManager.getContextPath(_context));
395            _activeThreads.incrementAndGet(); //access will not be called on a freshly created session so increment here
396         }
397         
398         
399     
400         
401         /**
402          * A restored session.
403          * 
404          * @param sessionId
405          * @param created
406          * @param accessed
407          * @param maxInterval
408          */
409         protected Session (String sessionId, long created, long accessed, long maxInterval)
410         {
411             super(GCloudSessionManager.this, created, accessed, sessionId);
412             _expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
413         }
414         
415         /** 
416          * Called on entry to the session.
417          * 
418          * @see org.eclipse.jetty.server.session.AbstractSession#access(long)
419          */
420         @Override
421         protected boolean access(long time)
422         {
423             if (LOG.isDebugEnabled())
424                 LOG.debug("Access session({}) for context {} on worker {}", getId(), getContextPath(), getSessionIdManager().getWorkerName());
425             try
426             {
427 
428                 long now = System.currentTimeMillis();
429                 //lock so that no other thread can call access or complete until the first one has refreshed the session object if necessary
430                 _lock.lock();
431                 //a request thread is entering
432                 if (_activeThreads.incrementAndGet() == 1)
433                 {
434                     //if the first thread, check that the session in memory is not stale, if we're checking for stale sessions
435                     if (getStaleIntervalSec() > 0  && (now - getLastSyncTime()) >= (getStaleIntervalSec() * 1000L))
436                     {
437                         if (LOG.isDebugEnabled())
438                             LOG.debug("Acess session({}) for context {} on worker {} stale session. Reloading.", getId(), getContextPath(), getSessionIdManager().getWorkerName());
439                         refresh();
440                     }
441                 }
442             }
443             catch (Exception e)
444             {
445                 LOG.warn(e);
446             }
447             finally
448             {            
449                 _lock.unlock();
450             }
451 
452             if (super.access(time))
453             {
454                 int maxInterval=getMaxInactiveInterval();
455                 _expiryTime = (maxInterval <= 0 ? 0 : (time + maxInterval*1000L));
456                 return true;
457             }
458             return false;
459         }
460 
461 
462         /**
463          * Exit from session
464          * @see org.eclipse.jetty.server.session.AbstractSession#complete()
465          */
466         @Override
467         protected void complete()
468         {
469             super.complete();
470 
471             //lock so that no other thread that might be calling access can proceed until this complete is done
472             _lock.lock();
473 
474             try
475             {
476                 //if this is the last request thread to be in the session
477                 if (_activeThreads.decrementAndGet() == 0)
478                 {
479                     try
480                     {
481                         //an invalid session will already have been removed from the
482                         //local session map and deleted from the cluster. If its valid save
483                         //it to the cluster.
484                         //TODO consider doing only periodic saves if only the last access
485                         //time to the session changes
486                         if (isValid())
487                         {
488                             //if session still valid && its dirty or stale or never been synced, write it to the cluster
489                             //otherwise, we just keep the updated last access time in memory
490                             if (_dirty || getLastSyncTime() == 0 || isStale(System.currentTimeMillis()))
491                             {
492                                 willPassivate();
493                                 save(this);
494                                 didActivate();
495                             }
496                         }
497                     }
498                     catch (Exception e)
499                     {
500                         LOG.warn("Problem saving session({})",getId(), e);
501                     } 
502                     finally
503                     {
504                         _dirty = false;
505                     }
506                 }
507             }
508             finally
509             {
510                 _lock.unlock();
511             }
512         }
513         
514         /** Test if the session is stale
515          * @param atTime
516          * @return
517          */
518         protected boolean isStale (long atTime)
519         {
520             return (getStaleIntervalSec() > 0) && (atTime - getLastSyncTime() >= (getStaleIntervalSec()*1000L));
521         }
522         
523         
524         /** Test if the session is dirty
525          * @return
526          */
527         protected boolean isDirty ()
528         {
529             return _dirty;
530         }
531 
532         /** 
533          * Expire the session.
534          * 
535          * @see org.eclipse.jetty.server.session.AbstractSession#timeout()
536          */
537         @Override
538         protected void timeout()
539         {
540             if (LOG.isDebugEnabled()) LOG.debug("Timing out session {}", getId());
541             super.timeout();
542         }
543         
544       
545         
546         /**
547          * Reload the session from the cluster. If the node that
548          * last managed the session from the cluster is ourself,
549          * then the session does not need refreshing.
550          * NOTE: this method MUST be called with sufficient locks
551          * in place to prevent 2 or more concurrent threads from
552          * simultaneously updating the session.
553          */
554         private void refresh () 
555         throws Exception
556         {
557             //get fresh copy from the cluster
558             Session fresh = load(makeKey(getClusterId(), _context));
559 
560             //if the session no longer exists, invalidate
561             if (fresh == null)
562             {
563                 invalidate();
564                 return;
565             }
566 
567             //cluster copy assumed to be the same as we were the last
568             //node to manage it
569             if (fresh.getLastNode().equals(getLastNode()))
570                 return;
571 
572             setLastNode(getSessionIdManager().getWorkerName());
573             
574             //prepare for refresh
575             willPassivate();
576 
577             //if fresh has no attributes, remove them
578             if (fresh.getAttributes() == 0)
579                 this.clearAttributes();
580             else
581             {
582                 //reconcile attributes
583                 for (String key:fresh.getAttributeMap().keySet())
584                 {
585                     Object freshvalue = fresh.getAttribute(key);
586 
587                     //session does not already contain this attribute, so bind it
588                     if (getAttribute(key) == null)
589                     { 
590                         doPutOrRemove(key,freshvalue);
591                         bindValue(key,freshvalue);
592                     }
593                     else //session already contains this attribute, update its value
594                     {
595                         doPutOrRemove(key,freshvalue);
596                     }
597 
598                 }
599                 // cleanup, remove values from session, that don't exist in data anymore:
600                 for (String key : getNames())
601                 {
602                     if (fresh.getAttribute(key) == null)
603                     {
604                         Object oldvalue = getAttribute(key);
605                         doPutOrRemove(key,null);
606                         unbindValue(key,oldvalue);
607                     }
608                 }
609             }
610             //finish refresh
611             didActivate();
612         }
613 
614 
615         public void setExpiry (long expiry)
616         {
617             _expiryTime = expiry;
618         }
619         
620 
621         public long getExpiry ()
622         {
623             return _expiryTime;
624         }
625         
626         public boolean isExpiredAt (long time)
627         {
628             if (_expiryTime <= 0)
629                 return false; //never expires
630             
631             return  (_expiryTime <= time);
632         }
633         
634         public void swapId (String newId, String newNodeId)
635         {
636             //TODO probably synchronize rather than use the access/complete lock?
637             _lock.lock();
638             setClusterId(newId);
639             setNodeId(newNodeId);
640             _lock.unlock();
641         }
642         
643         @Override
644         public void setAttribute (String name, Object value)
645         {
646             Object old = changeAttribute(name, value);
647             if (value == null && old == null)
648                 return; //if same as remove attribute but attribute was already removed, no change
649             
650            _dirty = true;
651         }
652         
653         
654         public String getContextPath()
655         {
656             return _contextPath;
657         }
658 
659 
660         public void setContextPath(String contextPath)
661         {
662             this._contextPath = contextPath;
663         }
664 
665 
666         public String getVHost()
667         {
668             return _vhost;
669         }
670 
671 
672         public void setVHost(String vhost)
673         {
674             this._vhost = vhost;
675         }
676         
677         public String getLastNode()
678         {
679             return _lastNode;
680         }
681 
682 
683         public void setLastNode(String lastNode)
684         {
685             _lastNode = lastNode;
686         }
687 
688 
689         public long getLastSyncTime()
690         {
691             return _lastSyncTime;
692         }
693 
694 
695         public void setLastSyncTime(long lastSyncTime)
696         {
697             _lastSyncTime = lastSyncTime;
698         }
699 
700     }
701 
702 
703 
704     
705     /**
706      * Start the session manager.
707      *
708      * @see org.eclipse.jetty.server.session.AbstractSessionManager#doStart()
709      */
710     @Override
711     public void doStart() throws Exception
712     {
713         if (_sessionIdManager == null)
714             throw new IllegalStateException("No session id manager defined");
715         
716         GCloudConfiguration config = ((GCloudSessionIdManager)_sessionIdManager).getConfig();
717         if (config == null)
718             throw new IllegalStateException("No gcloud configuration");
719         
720         
721         _datastore = DatastoreFactory.instance().get(config.getDatastoreOptions());
722         _keyFactory = _datastore.newKeyFactory().kind(KIND);
723         _converter = new SessionEntityConverter();       
724         _sessions = new ConcurrentHashMap<String, Session>();
725 
726         //try and use a common scheduler, fallback to own
727         _scheduler = getSessionHandler().getServer().getBean(Scheduler.class);
728         if (_scheduler == null)
729         {
730             _scheduler = new ScheduledExecutorScheduler();
731             _ownScheduler = true;
732             _scheduler.start();
733         }
734         else if (!_scheduler.isStarted())
735             throw new IllegalStateException("Shared scheduler not started");
736  
737         setScavengeIntervalSec(getScavengeIntervalSec());
738         
739         super.doStart();
740     }
741 
742 
743     /**
744      * Stop the session manager.
745      *
746      * @see org.eclipse.jetty.server.session.AbstractSessionManager#doStop()
747      */
748     @Override
749     public void doStop() throws Exception
750     {
751         super.doStop();
752 
753         if (_task!=null)
754             _task.cancel();
755         _task=null;
756         if (_ownScheduler && _scheduler !=null)
757             _scheduler.stop();
758         _scheduler = null;
759 
760         _sessions.clear();
761         _sessions = null;
762     }
763 
764 
765 
766     /**
767      * Look for sessions in local memory that have expired.
768      */
769     public void scavenge ()
770     {
771         try
772         {
773             //scavenge in the database every so often
774             scavengeGCloudDataStore();
775         }
776         catch (Exception e)
777         {
778             LOG.warn("Problem scavenging", e);
779         }
780     }
781 
782  
783     
784     protected void scavengeGCloudDataStore()
785     throws Exception
786     {
787        
788         //query the datastore for sessions that have expired
789         long now = System.currentTimeMillis();
790         
791         //give a bit of leeway so we don't immediately something that has only just expired a nanosecond ago
792         now = now - (_scavengeIntervalMs/2);
793         
794         if (LOG.isDebugEnabled())
795             LOG.debug("Scavenging for sessions expired before "+now);
796 
797 
798         GqlQuery.Builder builder = Query.gqlQueryBuilder(ResultType.ENTITY, "select * from "+KIND+" where expiry < @1 limit "+_maxResults);
799         builder.allowLiteral(true);
800         builder.addBinding(now);
801         Query<Entity> query = builder.build();
802         QueryResults<Entity> results = _datastore.run(query);
803         
804         while (results.hasNext())
805         {          
806             Entity sessionEntity = results.next();
807             scavengeSession(sessionEntity);        
808         }
809 
810     }
811 
812     /**
813      * Scavenge a session that has expired
814      * @param e
815      * @throws Exception
816      */
817     protected void scavengeSession (Entity e)
818             throws Exception
819     {
820         long now = System.currentTimeMillis();
821         Session session = _converter.sessionFromEntity(e);
822         if (session == null)
823             return;
824 
825         if (LOG.isDebugEnabled())
826             LOG.debug("Scavenging session: {}",session.getId());
827         //if the session isn't in memory already, put it there so we can do a normal timeout call
828          Session memSession =  _sessions.putIfAbsent(session.getId(), session);
829          if (memSession == null)
830          {
831              memSession = session;
832          }
833 
834         //final check
835         if (memSession.isExpiredAt(now))
836         {
837             if (LOG.isDebugEnabled()) LOG.debug("Session {} is definitely expired", memSession.getId());
838             memSession.timeout();   
839         }
840     }
841 
842     public long getScavengeIntervalSec ()
843     {
844         return _scavengeIntervalMs/1000;
845     }
846 
847     
848     
849     /**
850      * Set the interval between runs of the scavenger. It should not be run too
851      * often.
852      * 
853      * 
854      * @param sec
855      */
856     public void setScavengeIntervalSec (long sec)
857     {
858 
859         long old_period=_scavengeIntervalMs;
860         long period=sec*1000L;
861 
862         _scavengeIntervalMs=period;
863 
864         if (_scavengeIntervalMs > 0)
865         {
866             //add a bit of variability into the scavenge time so that not all
867             //nodes with the same scavenge time sync up
868             long tenPercent = _scavengeIntervalMs/10;
869             if ((System.currentTimeMillis()%2) == 0)
870                 _scavengeIntervalMs += tenPercent;
871             if (LOG.isDebugEnabled())
872                 LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
873         }
874         else
875         {
876             if (LOG.isDebugEnabled())
877                 LOG.debug("Scavenging disabled"); 
878         }
879 
880  
881         
882         synchronized (this)
883         {
884             if (_scheduler != null && (period!=old_period || _task==null))
885             {
886                 //clean up any previously scheduled scavenger
887                 if (_task!=null)
888                     _task.cancel();
889 
890                 //start a new one
891                 if (_scavengeIntervalMs > 0)
892                 {
893                     if (_scavenger == null)
894                         _scavenger = new Scavenger();
895 
896                     _task = _scheduler.schedule(_scavenger,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
897                 }
898             }
899         }
900     }
901     
902     
903     public long getStaleIntervalSec()
904     {
905         return _staleIntervalSec;
906     }
907 
908 
909     public void setStaleIntervalSec(long staleIntervalSec)
910     {
911         _staleIntervalSec = staleIntervalSec;
912     }
913     
914     
915     public int getMaxResults()
916     {
917         return _maxResults;
918     }
919 
920 
921     public void setMaxResults(int maxResults)
922     {
923         if (_maxResults <= 0)
924             _maxResults = DEFAULT_MAX_QUERY_RESULTS;
925         else
926             _maxResults = maxResults;
927     }
928 
929 
930     /** 
931      * Add a new session for the context related to this session manager
932      * 
933      * @see org.eclipse.jetty.server.session.AbstractSessionManager#addSession(org.eclipse.jetty.server.session.AbstractSession)
934      */
935     @Override
936     protected void addSession(AbstractSession session)
937     {
938         if (session==null)
939             return;
940         
941         if (LOG.isDebugEnabled()) LOG.debug("Adding session({}) to session manager for context {} on worker {}",session.getClusterId(), getContextPath(getContext()),getSessionIdManager().getWorkerName() + " with lastnode="+((Session)session).getLastNode());
942         _sessions.put(session.getClusterId(), (Session)session);
943         
944         try
945         {     
946                 session.willPassivate();
947                 save(((GCloudSessionManager.Session)session));
948                 session.didActivate();
949             
950         }
951         catch (Exception e)
952         {
953             LOG.warn("Unable to store new session id="+session.getId() , e);
954         }
955     }
956 
957     /** 
958      * Ask the cluster for the session.
959      * 
960      * @see org.eclipse.jetty.server.session.AbstractSessionManager#getSession(java.lang.String)
961      */
962     @Override
963     public AbstractSession getSession(String idInCluster)
964     {
965         Session session = null;
966 
967         //try and find the session in this node's memory
968         Session memSession = (Session)_sessions.get(idInCluster);
969 
970         if (LOG.isDebugEnabled())
971             LOG.debug("getSession({}) {} in session map",idInCluster,(memSession==null?"not":""));
972 
973         long now = System.currentTimeMillis();
974         try
975         {
976             //if the session is not in this node's memory, then load it from the datastore
977             if (memSession == null)
978             {
979                 if (LOG.isDebugEnabled())
980                     LOG.debug("getSession({}): loading session data from cluster", idInCluster);
981 
982                 session = load(makeKey(idInCluster, _context));
983                 if (session != null)
984                 {
985                     //Check that it wasn't expired
986                     if (session.getExpiry() > 0 && session.getExpiry() <= now)
987                     {
988                         if (LOG.isDebugEnabled()) LOG.debug("getSession ({}): Session expired", idInCluster);
989                         //ensure that the session id for the expired session is deleted so that a new session with the 
990                         //same id cannot be created (because the idInUse() test would succeed)
991                         ((GCloudSessionIdManager)getSessionIdManager()).removeSession(session);
992                         return null;  
993                     }
994 
995                     //Update the last worker node to me
996                     session.setLastNode(getSessionIdManager().getWorkerName());                            
997                     //TODO consider saving session here if lastNode was not this node
998 
999                     //Check that another thread hasn't loaded the same session
1000                     Session existingSession = _sessions.putIfAbsent(idInCluster, session);
1001                     if (existingSession != null)
1002                     {
1003                         //use the one that the other thread inserted
1004                         session = existingSession;
1005                         LOG.debug("getSession({}): using session loaded by another request thread ", idInCluster);
1006                     }
1007                     else
1008                     {
1009                         //indicate that the session was reinflated
1010                         session.didActivate();
1011                         LOG.debug("getSession({}): loaded session from cluster", idInCluster);
1012                     }
1013                     return session;
1014                 }
1015                 else
1016                 {
1017                     //The requested session does not exist anywhere in the cluster
1018                     LOG.debug("getSession({}): No session in cluster matching",idInCluster);
1019                     return null;
1020                 }
1021             }
1022             else
1023             {
1024                //The session exists in this node's memory
1025                LOG.debug("getSession({}): returning session from local memory ", memSession.getClusterId());
1026                 return memSession;
1027             }
1028         }
1029         catch (Exception e)
1030         {
1031             LOG.warn("Unable to load session="+idInCluster, e);
1032             return null;
1033         }
1034     }
1035     
1036     
1037 
1038     /** 
1039      * The session manager is stopping.
1040      * 
1041      * @see org.eclipse.jetty.server.session.AbstractSessionManager#shutdownSessions()
1042      */
1043     @Override
1044     protected void shutdownSessions() throws Exception
1045     {
1046         Set<String> keys = new HashSet<String>(_sessions.keySet());
1047         for (String key:keys)
1048         {
1049             Session session = _sessions.remove(key); //take the session out of the session list
1050             //If the session is dirty, then write it to the cluster.
1051             //If the session is simply stale do NOT write it to the cluster, as some other node
1052             //may have started managing that session - this means that the last accessed/expiry time
1053             //will not be updated, meaning it may look like it can expire sooner than it should.
1054             try
1055             {
1056                 if (session.isDirty())
1057                 {
1058                     if (LOG.isDebugEnabled())
1059                         LOG.debug("Saving dirty session {} before exiting ", session.getId());
1060                     save(session);
1061                 }
1062             }
1063             catch (Exception e)
1064             {
1065                 LOG.warn(e);
1066             }
1067         }
1068     }
1069 
1070 
1071     @Override
1072     protected AbstractSession newSession(HttpServletRequest request)
1073     {
1074         return new Session(request);
1075     }
1076 
1077     /** 
1078      * Remove a session from local memory, and delete it from
1079      * the cluster cache.
1080      * 
1081      * @see org.eclipse.jetty.server.session.AbstractSessionManager#removeSession(java.lang.String)
1082      */
1083     @Override
1084     protected boolean removeSession(String idInCluster)
1085     {
1086         Session session = (Session)_sessions.remove(idInCluster);
1087         try
1088         {
1089             if (session != null)
1090             {
1091                 delete(session);
1092             }
1093         }
1094         catch (Exception e)
1095         {
1096             LOG.warn("Problem deleting session id="+idInCluster, e);
1097         }
1098         return session!=null;
1099     }
1100     
1101     
1102     
1103     
1104     @Override
1105     public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId)
1106     {
1107         Session session = null;
1108         try
1109         {
1110             //take the session with that id out of our managed list
1111             session = (Session)_sessions.remove(oldClusterId);
1112             if (session != null)
1113             {
1114                 //TODO consider transactionality and ramifications if the session is live on another node
1115                 delete(session); //delete the old session from the cluster  
1116                 session.swapId(newClusterId, newNodeId); //update the session
1117                 _sessions.put(newClusterId, session); //put it into managed list under new key
1118                 save(session); //put the session under the new id into the cluster
1119             }
1120         }
1121         catch (Exception e)
1122         {
1123             LOG.warn(e);
1124         }
1125 
1126         super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
1127     }
1128 
1129 
1130     /**
1131      * Load a session from the clustered cache.
1132      * 
1133      * @param key
1134      * @return
1135      */
1136     protected Session load (Key key)
1137     throws Exception
1138     {
1139         if (_datastore == null)
1140             throw new IllegalStateException("No DataStore");
1141         
1142         if (LOG.isDebugEnabled()) LOG.debug("Loading session {} from DataStore", key);
1143 
1144         Entity entity = _datastore.get(key);
1145         if (entity == null)
1146         {
1147             if (LOG.isDebugEnabled()) LOG.debug("No session {} in DataStore ",key);
1148             return null;
1149         }
1150         else
1151         {
1152             Session session = _converter.sessionFromEntity(entity);
1153             session.setLastSyncTime(System.currentTimeMillis());
1154             return session;
1155         }
1156     }
1157     
1158     
1159     
1160     /**
1161      * Save or update the session to the cluster cache
1162      * 
1163      * @param session
1164      * @throws Exception
1165      */
1166     protected void save (GCloudSessionManager.Session session)
1167     throws Exception
1168     {
1169         if (_datastore == null)
1170             throw new IllegalStateException("No DataStore");
1171         
1172         if (LOG.isDebugEnabled()) LOG.debug("Writing session {} to DataStore", session.getId());
1173     
1174         Entity entity = _converter.entityFromSession(session, makeKey(session, _context));
1175         _datastore.put(entity);
1176         session.setLastSyncTime(System.currentTimeMillis());
1177     }
1178     
1179     
1180     
1181     /**
1182      * Remove the session from the cluster cache.
1183      * 
1184      * @param session
1185      */
1186     protected void delete (GCloudSessionManager.Session session)
1187     {  
1188         if (_datastore == null)
1189             throw new IllegalStateException("No DataStore");
1190         if (LOG.isDebugEnabled()) LOG.debug("Removing session {} from DataStore", session.getId());
1191         _datastore.delete(makeKey(session, _context));
1192     }
1193 
1194     
1195     /**
1196      * Invalidate a session for this context with the given id
1197      * 
1198      * @param idInCluster
1199      */
1200     public void invalidateSession (String idInCluster)
1201     {
1202         Session session = (Session)_sessions.get(idInCluster);
1203 
1204         if (session != null)
1205         {
1206             session.invalidate();
1207         }
1208     }
1209 
1210     
1211     /**
1212      * Make a unique key for this session.
1213      * As the same session id can be used across multiple contexts, to
1214      * make it unique, the key must be composed of:
1215      * <ol>
1216      * <li>the id</li>
1217      * <li>the context path</li>
1218      * <li>the virtual hosts</li>
1219      * </ol>
1220      * 
1221      *TODO consider the difference between getClusterId and getId
1222      * @param session
1223      * @return
1224      */
1225     private Key makeKey (Session session, Context context)
1226     {
1227        return makeKey(session.getId(), context);
1228     }
1229     
1230     /**
1231      * Make a unique key for this session.
1232      * As the same session id can be used across multiple contexts, to
1233      * make it unique, the key must be composed of:
1234      * <ol>
1235      * <li>the id</li>
1236      * <li>the context path</li>
1237      * <li>the virtual hosts</li>
1238      * </ol>
1239      * 
1240      *TODO consider the difference between getClusterId and getId
1241      * @param session
1242      * @return
1243      */
1244     private Key makeKey (String id, Context context)
1245     {
1246         String key = getContextPath(context);
1247         key = key + "_" + getVirtualHost(context);
1248         key = key+"_"+id;
1249         return _keyFactory.newKey(key);
1250     }
1251     
1252     /**
1253      * Turn the context path into an acceptable string
1254      * 
1255      * @param context
1256      * @return
1257      */
1258     private static String getContextPath (ContextHandler.Context context)
1259     {
1260         return canonicalize (context.getContextPath());
1261     }
1262 
1263     /**
1264      * Get the first virtual host for the context.
1265      *
1266      * Used to help identify the exact session/contextPath.
1267      *
1268      * @return 0.0.0.0 if no virtual host is defined
1269      */
1270     private static String getVirtualHost (ContextHandler.Context context)
1271     {
1272         String vhost = "0.0.0.0";
1273 
1274         if (context==null)
1275             return vhost;
1276 
1277         String [] vhosts = context.getContextHandler().getVirtualHosts();
1278         if (vhosts==null || vhosts.length==0 || vhosts[0]==null)
1279             return vhost;
1280 
1281         return vhosts[0];
1282     }
1283 
1284     /**
1285      * Make an acceptable name from a context path.
1286      *
1287      * @param path
1288      * @return
1289      */
1290     private static String canonicalize (String path)
1291     {
1292         if (path==null)
1293             return "";
1294 
1295         return path.replace('/', '_').replace('.','_').replace('\\','_');
1296     }
1297 
1298 }