1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.session.infinispan;
20
21 import java.io.IOException;
22 import java.io.ObjectStreamException;
23 import java.io.Serializable;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import javax.servlet.http.HttpServletRequest;
34
35 import org.eclipse.jetty.server.handler.ContextHandler;
36 import org.eclipse.jetty.server.handler.ContextHandler.Context;
37 import org.eclipse.jetty.server.session.AbstractSession;
38 import org.eclipse.jetty.server.session.AbstractSessionManager;
39 import org.eclipse.jetty.server.session.MemSession;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
43 import org.eclipse.jetty.util.thread.Scheduler;
44 import org.infinispan.Cache;
45 import org.infinispan.commons.api.BasicCache;
46 import org.omg.CORBA._IDLTypeStub;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class InfinispanSessionManager extends AbstractSessionManager
72 {
73 private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
74
75
76
77
78 private BasicCache<String, Object> _cache;
79
80
81
82
83
84 private ConcurrentHashMap<String, InfinispanSessionManager.Session> _sessions;
85
86
87
88
89
90
91
92
93 private long _staleIntervalSec = 0;
94
95 protected Scheduler.Task _task;
96 protected Scheduler _scheduler;
97 protected Scavenger _scavenger;
98 protected long _scavengeIntervalMs = 1000L * 60 * 10;
99 protected boolean _ownScheduler;
100
101
102
103
104
105
106
107 protected class Scavenger implements Runnable
108 {
109
110 @Override
111 public void run()
112 {
113 try
114 {
115 scavenge();
116 }
117 finally
118 {
119 if (_scheduler != null && _scheduler.isRunning())
120 _task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
121 }
122 }
123 }
124
125
126
127
128
129
130
131
132
133 public class SerializableSessionData implements Serializable
134 {
135
136
137
138 private static final long serialVersionUID = -7779120106058533486L;
139 String clusterId;
140 String contextPath;
141 String vhost;
142 long accessed;
143 long lastAccessed;
144 long createTime;
145 long cookieSetTime;
146 String lastNode;
147 long expiry;
148 long maxInactive;
149 Map<String, Object> attributes;
150
151 public SerializableSessionData()
152 {
153
154 }
155
156
157 public SerializableSessionData(Session s)
158 {
159 clusterId = s.getClusterId();
160 contextPath = s.getContextPath();
161 vhost = s.getVHost();
162 accessed = s.getAccessed();
163 lastAccessed = s.getLastAccessedTime();
164 createTime = s.getCreationTime();
165 cookieSetTime = s.getCookieSetTime();
166 lastNode = s.getLastNode();
167 expiry = s.getExpiry();
168 maxInactive = s.getMaxInactiveInterval();
169 attributes = s.getAttributeMap();
170 }
171
172 private void writeObject(java.io.ObjectOutputStream out) throws IOException
173 {
174 out.writeUTF(clusterId);
175 out.writeUTF(contextPath);
176 out.writeUTF(vhost);
177
178 out.writeLong(accessed);
179 out.writeLong(lastAccessed);
180 out.writeLong(createTime);
181 out.writeLong(cookieSetTime);
182 out.writeUTF(lastNode);
183
184 out.writeLong(expiry);
185 out.writeLong(maxInactive);
186 out.writeObject(attributes);
187 }
188
189 private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException
190 {
191 clusterId = in.readUTF();
192 contextPath = in.readUTF();
193 vhost = in.readUTF();
194
195 accessed = in.readLong();
196 lastAccessed = in.readLong();
197 createTime = in.readLong();
198 cookieSetTime = in.readLong();
199 lastNode = in.readUTF();
200 expiry = in.readLong();
201 maxInactive = in.readLong();
202 attributes = (HashMap<String,Object>)in.readObject();
203 }
204
205 }
206
207
208
209
210
211
212
213
214
215 public class Session extends MemSession
216 {
217
218 private ReentrantLock _lock = new ReentrantLock();
219
220
221
222
223 private String _contextPath;
224
225
226
227
228
229
230 private long _expiryTime;
231
232
233
234
235
236 private long _lastSyncTime;
237
238
239
240
241
242 private String _lastNode;
243
244
245
246
247
248 protected boolean _dirty=false;
249
250
251
252
253
254
255
256 private String _vhost;
257
258
259
260
261
262 private AtomicInteger _activeThreads = new AtomicInteger(0);
263
264
265
266
267
268
269
270
271
272 protected Session (HttpServletRequest request)
273 {
274 super(InfinispanSessionManager.this,request);
275 long maxInterval = getMaxInactiveInterval();
276 _expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
277 _lastNode = getSessionIdManager().getWorkerName();
278 setVHost(InfinispanSessionManager.getVirtualHost(_context));
279 setContextPath(InfinispanSessionManager.getContextPath(_context));
280 _activeThreads.incrementAndGet();
281 }
282
283
284 protected Session (SerializableSessionData sd)
285 {
286 super(InfinispanSessionManager.this, sd.createTime, sd.accessed, sd.clusterId);
287 _expiryTime = (sd.maxInactive <= 0 ? 0 : (System.currentTimeMillis() + sd.maxInactive*1000L));
288 setLastNode(sd.lastNode);
289 setContextPath(sd.contextPath);
290 setVHost(sd.vhost);
291 addAttributes(sd.attributes);
292 }
293
294
295
296
297
298
299
300
301
302
303 protected Session (String sessionId, long created, long accessed, long maxInterval)
304 {
305 super(InfinispanSessionManager.this, created, accessed, sessionId);
306 _expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
307 }
308
309
310
311
312
313
314 @Override
315 protected boolean access(long time)
316 {
317 if (LOG.isDebugEnabled())
318 LOG.debug("Access session({}) for context {} on worker {}", getId(), getContextPath(), getSessionIdManager().getWorkerName());
319 try
320 {
321
322 long now = System.currentTimeMillis();
323
324 _lock.lock();
325
326 if (_activeThreads.incrementAndGet() == 1)
327 {
328
329 if (getStaleIntervalSec() > 0 && (now - getLastSyncTime()) >= (getStaleIntervalSec() * 1000L))
330 {
331 if (LOG.isDebugEnabled())
332 LOG.debug("Acess session({}) for context {} on worker {} stale session. Reloading.", getId(), getContextPath(), getSessionIdManager().getWorkerName());
333 refresh();
334 }
335 }
336 }
337 catch (Exception e)
338 {
339 LOG.warn(e);
340 }
341 finally
342 {
343 _lock.unlock();
344 }
345
346 if (super.access(time))
347 {
348 int maxInterval=getMaxInactiveInterval();
349 _expiryTime = (maxInterval <= 0 ? 0 : (time + maxInterval*1000L));
350 return true;
351 }
352 return false;
353 }
354
355
356
357
358
359
360 @Override
361 protected void complete()
362 {
363 super.complete();
364
365
366 _lock.lock();
367
368 try
369 {
370
371 if (_activeThreads.decrementAndGet() == 0)
372 {
373 try
374 {
375
376
377
378
379
380 if (isValid())
381 {
382
383
384 if (_dirty || getLastSyncTime() == 0 || isStale(System.currentTimeMillis()))
385 {
386 willPassivate();
387 save(this);
388 didActivate();
389 }
390 }
391 }
392 catch (Exception e)
393 {
394 LOG.warn("Problem saving session({})",getId(), e);
395 }
396 finally
397 {
398 _dirty = false;
399 }
400 }
401 }
402 finally
403 {
404 _lock.unlock();
405 }
406 }
407
408
409
410
411
412 protected boolean isStale (long atTime)
413 {
414 return (getStaleIntervalSec() > 0) && (atTime - getLastSyncTime() >= (getStaleIntervalSec()*1000L));
415 }
416
417
418
419
420
421 protected boolean isDirty ()
422 {
423 return _dirty;
424 }
425
426
427
428
429
430
431 @Override
432 protected void timeout()
433 {
434 super.timeout();
435 }
436
437
438
439
440
441
442
443
444
445
446
447 private void refresh ()
448 {
449
450 Session fresh = load(makeKey(getClusterId(), _context));
451
452
453 if (fresh == null)
454 {
455 invalidate();
456 return;
457 }
458
459
460
461 if (fresh.getLastNode().equals(getLastNode()))
462 return;
463
464 setLastNode(getSessionIdManager().getWorkerName());
465
466
467 willPassivate();
468
469
470 if (fresh.getAttributes() == 0)
471 this.clearAttributes();
472 else
473 {
474
475 for (String key:fresh.getAttributeMap().keySet())
476 {
477 Object freshvalue = fresh.getAttribute(key);
478
479
480 if (getAttribute(key) == null)
481 {
482 doPutOrRemove(key,freshvalue);
483 bindValue(key,freshvalue);
484 }
485 else
486 {
487 doPutOrRemove(key,freshvalue);
488 }
489
490 }
491
492 for (String key : getNames())
493 {
494 if (fresh.getAttribute(key) == null)
495 {
496 Object oldvalue = getAttribute(key);
497 doPutOrRemove(key,null);
498 unbindValue(key,oldvalue);
499 }
500 }
501 }
502
503 didActivate();
504 }
505
506
507 public void setExpiry (long expiry)
508 {
509 _expiryTime = expiry;
510 }
511
512
513 public long getExpiry ()
514 {
515 return _expiryTime;
516 }
517
518 public void swapId (String newId, String newNodeId)
519 {
520
521 _lock.lock();
522 setClusterId(newId);
523 setNodeId(newNodeId);
524 _lock.unlock();
525 }
526
527 @Override
528 public void setAttribute (String name, Object value)
529 {
530 Object old = changeAttribute(name, value);
531 if (value == null && old == null)
532 return;
533
534 _dirty = true;
535 }
536
537
538 public String getContextPath()
539 {
540 return _contextPath;
541 }
542
543
544 public void setContextPath(String contextPath)
545 {
546 this._contextPath = contextPath;
547 }
548
549
550 public String getVHost()
551 {
552 return _vhost;
553 }
554
555
556 public void setVHost(String vhost)
557 {
558 this._vhost = vhost;
559 }
560
561 public String getLastNode()
562 {
563 return _lastNode;
564 }
565
566
567 public void setLastNode(String lastNode)
568 {
569 _lastNode = lastNode;
570 }
571
572
573 public long getLastSyncTime()
574 {
575 return _lastSyncTime;
576 }
577
578
579 public void setLastSyncTime(long lastSyncTime)
580 {
581 _lastSyncTime = lastSyncTime;
582 }
583
584 }
585
586
587
588
589
590
591
592
593
594 @Override
595 public void doStart() throws Exception
596 {
597 if (_sessionIdManager == null)
598 throw new IllegalStateException("No session id manager defined");
599
600 if (_cache == null)
601 throw new IllegalStateException("No session cache defined");
602
603 _sessions = new ConcurrentHashMap<String, Session>();
604
605
606 _scheduler = getSessionHandler().getServer().getBean(Scheduler.class);
607 if (_scheduler == null)
608 {
609 _scheduler = new ScheduledExecutorScheduler();
610 _ownScheduler = true;
611 _scheduler.start();
612 }
613 else if (!_scheduler.isStarted())
614 throw new IllegalStateException("Shared scheduler not started");
615
616 setScavengeInterval(getScavengeInterval());
617
618 super.doStart();
619 }
620
621
622
623
624
625
626
627 @Override
628 public void doStop() throws Exception
629 {
630 super.doStop();
631
632 if (_task!=null)
633 _task.cancel();
634 _task=null;
635 if (_ownScheduler && _scheduler !=null)
636 _scheduler.stop();
637 _scheduler = null;
638
639 _sessions.clear();
640 _sessions = null;
641 }
642
643
644
645
646
647
648
649
650
651 public void scavenge ()
652 {
653 Set<String> candidateIds = new HashSet<String>();
654 long now = System.currentTimeMillis();
655
656 LOG.info("SessionManager for context {} scavenging at {} ", getContextPath(getContext()), now);
657 for (Map.Entry<String, Session> entry:_sessions.entrySet())
658 {
659 long expiry = entry.getValue().getExpiry();
660 if (expiry > 0 && expiry < now)
661 candidateIds.add(entry.getKey());
662 }
663
664 for (String candidateId:candidateIds)
665 {
666 if (LOG.isDebugEnabled())
667 LOG.debug("Session {} expired ", candidateId);
668
669 Session candidateSession = _sessions.get(candidateId);
670 if (candidateSession != null)
671 {
672
673
674
675 Session cachedSession = load(makeKey(candidateId, _context));
676 if (cachedSession == null)
677 {
678 if (LOG.isDebugEnabled()) LOG.debug("Locally expired session({}) does not exist in cluster ",candidateId);
679
680 candidateSession.timeout();
681 }
682 else if (getSessionIdManager().getWorkerName().equals(cachedSession.getLastNode()))
683 {
684 if (LOG.isDebugEnabled()) LOG.debug("Expiring session({}) local to session manager",candidateId);
685
686 candidateSession.timeout();
687 }
688 else
689 {
690
691 if (LOG.isDebugEnabled()) LOG.debug("Session({}) not local to this session manager, removing from local memory", candidateId);
692 candidateSession.willPassivate();
693 _sessions.remove(candidateSession.getClusterId());
694 }
695
696 }
697 }
698 }
699
700
701
702 public long getScavengeInterval ()
703 {
704 return _scavengeIntervalMs/1000;
705 }
706
707
708
709
710
711
712
713
714
715
716 public void setScavengeInterval (long sec)
717 {
718 if (sec<=0)
719 sec=60;
720
721 long old_period=_scavengeIntervalMs;
722 long period=sec*1000L;
723
724 _scavengeIntervalMs=period;
725
726
727
728 long tenPercent = _scavengeIntervalMs/10;
729 if ((System.currentTimeMillis()%2) == 0)
730 _scavengeIntervalMs += tenPercent;
731
732 if (LOG.isDebugEnabled())
733 LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
734
735 synchronized (this)
736 {
737 if (_scheduler != null && (period!=old_period || _task==null))
738 {
739 if (_task!=null)
740 _task.cancel();
741 if (_scavenger == null)
742 _scavenger = new Scavenger();
743
744 _task = _scheduler.schedule(_scavenger,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
745 }
746 }
747 }
748
749
750
751
752
753
754
755
756
757 public BasicCache<String, Object> getCache()
758 {
759 return _cache;
760 }
761
762
763
764
765
766
767
768
769 public void setCache (BasicCache<String, Object> cache)
770 {
771 this._cache = cache;
772 }
773
774
775
776
777
778 public long getStaleIntervalSec()
779 {
780 return _staleIntervalSec;
781 }
782
783
784 public void setStaleIntervalSec(long staleIntervalSec)
785 {
786 _staleIntervalSec = staleIntervalSec;
787 }
788
789
790
791
792
793
794
795 @Override
796 protected void addSession(AbstractSession session)
797 {
798 if (session==null)
799 return;
800
801 if (LOG.isDebugEnabled()) LOG.debug("Adding session({}) to session manager for context {} on worker {}",session.getClusterId(), getContextPath(getContext()),getSessionIdManager().getWorkerName() + " with lastnode="+((Session)session).getLastNode());
802 _sessions.put(session.getClusterId(), (Session)session);
803
804 try
805 {
806 session.willPassivate();
807 save(((InfinispanSessionManager.Session)session));
808 session.didActivate();
809
810 }
811 catch (Exception e)
812 {
813 LOG.warn("Unable to store new session id="+session.getId() , e);
814 }
815 }
816
817
818
819
820
821
822 @Override
823 public AbstractSession getSession(String idInCluster)
824 {
825 Session session = null;
826
827
828 Session memSession = (Session)_sessions.get(idInCluster);
829
830 if (LOG.isDebugEnabled())
831 LOG.debug("getSession({}) {} in session map",idInCluster,(memSession==null?"not":""));
832
833 long now = System.currentTimeMillis();
834 try
835 {
836
837 if (memSession == null)
838 {
839 if (LOG.isDebugEnabled())
840 LOG.debug("getSession({}): loading session data from cluster", idInCluster);
841
842 session = load(makeKey(idInCluster, _context));
843 if (session != null)
844 {
845
846
847
848 if (session.getExpiry() > 0 && session.getExpiry() <= now)
849 {
850 if (LOG.isDebugEnabled()) LOG.debug("getSession ({}): Session expired", idInCluster);
851
852
853 ((InfinispanSessionIdManager)getSessionIdManager()).removeSession(session);
854 return null;
855 }
856
857
858 session.setLastNode(getSessionIdManager().getWorkerName());
859
860
861
862 Session existingSession = _sessions.putIfAbsent(idInCluster, session);
863 if (existingSession != null)
864 {
865
866 session = existingSession;
867 LOG.debug("getSession({}): using session loaded by another request thread ", idInCluster);
868 }
869 else
870 {
871
872 session.didActivate();
873 LOG.debug("getSession({}): loaded session from cluster", idInCluster);
874 }
875 return session;
876 }
877 else
878 {
879
880 LOG.debug("getSession({}): No session in cluster matching",idInCluster);
881 return null;
882 }
883 }
884 else
885 {
886
887 LOG.debug("getSession({}): returning session from local memory ", memSession.getClusterId());
888 return memSession;
889 }
890 }
891 catch (Exception e)
892 {
893 LOG.warn("Unable to load session="+idInCluster, e);
894 return null;
895 }
896 }
897
898
899
900
901
902
903
904
905 @Override
906 protected void shutdownSessions() throws Exception
907 {
908 Set<String> keys = new HashSet<String>(_sessions.keySet());
909 for (String key:keys)
910 {
911 Session session = _sessions.remove(key);
912
913
914
915
916 try
917 {
918 if (session.isDirty())
919 {
920 if (LOG.isDebugEnabled())
921 LOG.debug("Saving dirty session {} before exiting ", session.getId());
922 save(session);
923 }
924 }
925 catch (Exception e)
926 {
927 LOG.warn(e);
928 }
929 }
930 }
931
932
933 @Override
934 protected AbstractSession newSession(HttpServletRequest request)
935 {
936 return new Session(request);
937 }
938
939
940
941
942
943
944
945 @Override
946 protected boolean removeSession(String idInCluster)
947 {
948 Session session = (Session)_sessions.remove(idInCluster);
949 try
950 {
951 if (session != null)
952 delete(session);
953 }
954 catch (Exception e)
955 {
956 LOG.warn("Problem deleting session id="+idInCluster, e);
957 }
958 return session!=null;
959 }
960
961
962
963
964 @Override
965 public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId)
966 {
967 Session session = null;
968 try
969 {
970
971 session = (Session)_sessions.remove(oldClusterId);
972 if (session != null)
973 {
974
975 delete(session);
976 session.swapId(newClusterId, newNodeId);
977 _sessions.put(newClusterId, session);
978 save(session);
979 }
980 }
981 catch (Exception e)
982 {
983 LOG.warn(e);
984 }
985
986 super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
987 }
988
989
990
991
992
993
994
995
996 protected Session load (String key)
997 {
998 if (_cache == null)
999 throw new IllegalStateException("No cache");
1000
1001 if (LOG.isDebugEnabled()) LOG.debug("Loading session {} from cluster", key);
1002
1003 SerializableSessionData storableSession = (SerializableSessionData)_cache.get(key);
1004 if (storableSession == null)
1005 {
1006 if (LOG.isDebugEnabled()) LOG.debug("No session {} in cluster ",key);
1007 return null;
1008 }
1009 else
1010 {
1011 Session session = new Session (storableSession);
1012 session.setLastSyncTime(System.currentTimeMillis());
1013 return session;
1014 }
1015 }
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025 protected void save (InfinispanSessionManager.Session session)
1026 throws Exception
1027 {
1028 if (_cache == null)
1029 throw new IllegalStateException("No cache");
1030
1031 if (LOG.isDebugEnabled()) LOG.debug("Writing session {} to cluster", session.getId());
1032
1033 SerializableSessionData storableSession = new SerializableSessionData(session);
1034
1035
1036
1037
1038
1039 InfinispanSessionIdManager sessionIdManager = (InfinispanSessionIdManager)getSessionIdManager();
1040 if (storableSession.maxInactive > 0)
1041 _cache.put(makeKey(session, _context), storableSession, -1, TimeUnit.SECONDS, storableSession.maxInactive*sessionIdManager.getIdleExpiryMultiple(), TimeUnit.SECONDS);
1042 else
1043 _cache.put(makeKey(session, _context), storableSession);
1044
1045
1046 sessionIdManager.touch(session.getClusterId());
1047
1048 session.setLastSyncTime(System.currentTimeMillis());
1049 }
1050
1051
1052
1053
1054
1055
1056
1057
1058 protected void delete (InfinispanSessionManager.Session session)
1059 {
1060 if (_cache == null)
1061 throw new IllegalStateException("No cache");
1062 if (LOG.isDebugEnabled()) LOG.debug("Removing session {} from cluster", session.getId());
1063 _cache.remove(makeKey(session, _context));
1064 }
1065
1066
1067
1068
1069
1070
1071
1072 public void invalidateSession (String idInCluster)
1073 {
1074 Session session = (Session)_sessions.get(idInCluster);
1075
1076 if (session != null)
1077 {
1078 session.invalidate();
1079 }
1080 }
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097 private String makeKey (Session session, Context context)
1098 {
1099 return makeKey(session.getId(), context);
1100 }
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116 private String makeKey (String id, Context context)
1117 {
1118 String key = getContextPath(context);
1119 key = key + "_" + getVirtualHost(context);
1120 key = key+"_"+id;
1121 return key;
1122 }
1123
1124
1125
1126
1127
1128
1129
1130 private static String getContextPath (ContextHandler.Context context)
1131 {
1132 return canonicalize (context.getContextPath());
1133 }
1134
1135
1136
1137
1138
1139
1140
1141
1142 private static String getVirtualHost (ContextHandler.Context context)
1143 {
1144 String vhost = "0.0.0.0";
1145
1146 if (context==null)
1147 return vhost;
1148
1149 String [] vhosts = context.getContextHandler().getVirtualHosts();
1150 if (vhosts==null || vhosts.length==0 || vhosts[0]==null)
1151 return vhost;
1152
1153 return vhosts[0];
1154 }
1155
1156
1157
1158
1159
1160
1161
1162 private static String canonicalize (String path)
1163 {
1164 if (path==null)
1165 return "";
1166
1167 return path.replace('/', '_').replace('.','_').replace('\\','_');
1168 }
1169
1170 }