1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.nosql.mongodb;
20
21
22 import java.net.UnknownHostException;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.Random;
26 import java.util.Set;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpSession;
31
32 import org.eclipse.jetty.server.Handler;
33 import org.eclipse.jetty.server.Server;
34 import org.eclipse.jetty.server.SessionManager;
35 import org.eclipse.jetty.server.handler.ContextHandler;
36 import org.eclipse.jetty.server.session.AbstractSessionIdManager;
37 import org.eclipse.jetty.server.session.SessionHandler;
38 import org.eclipse.jetty.util.ConcurrentHashSet;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
42 import org.eclipse.jetty.util.thread.Scheduler;
43
44 import com.mongodb.BasicDBObject;
45 import com.mongodb.BasicDBObjectBuilder;
46 import com.mongodb.DBCollection;
47 import com.mongodb.DBCursor;
48 import com.mongodb.DBObject;
49 import com.mongodb.Mongo;
50 import com.mongodb.MongoException;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class MongoSessionIdManager extends AbstractSessionIdManager
67 {
68 private final static Logger __log = Log.getLogger("org.eclipse.jetty.server.session");
69
70 final static DBObject __version_1 = new BasicDBObject(MongoSessionManager.__VERSION,1);
71 final static DBObject __valid_false = new BasicDBObject(MongoSessionManager.__VALID,false);
72 final static DBObject __valid_true = new BasicDBObject(MongoSessionManager.__VALID,true);
73
74 final static long __defaultScavengePeriod = 30 * 60 * 1000;
75
76
77 final DBCollection _sessions;
78 protected Server _server;
79 private Scheduler _scheduler;
80 private boolean _ownScheduler;
81 private Scheduler.Task _scavengerTask;
82 private Scheduler.Task _purgerTask;
83
84
85
86 private long _scavengePeriod = __defaultScavengePeriod;
87
88
89
90
91
92 private boolean _purge = true;
93
94
95
96
97 private long _purgeDelay = 24 * 60 * 60 * 1000;
98
99
100
101
102
103 private long _purgeInvalidAge = 24 * 60 * 60 * 1000;
104
105
106
107
108
109 private long _purgeValidAge = 7 * 24 * 60 * 60 * 1000;
110
111
112
113
114
115 protected final Set<String> _sessionsIds = new ConcurrentHashSet<>();
116
117
118
119
120 private int _purgeLimit = 0;
121
122 private int _scavengeBlockSize;
123
124
125
126
127
128
129 protected class Scavenger implements Runnable
130 {
131 @Override
132 public void run()
133 {
134 try
135 {
136 scavenge();
137 }
138 finally
139 {
140 if (_scheduler != null && _scheduler.isRunning())
141 _scavengerTask = _scheduler.schedule(this, _scavengePeriod, TimeUnit.MILLISECONDS);
142 }
143 }
144 }
145
146
147
148
149
150
151 protected class Purger implements Runnable
152 {
153 @Override
154 public void run()
155 {
156 try
157 {
158 purge();
159 }
160 finally
161 {
162 if (_scheduler != null && _scheduler.isRunning())
163 _purgerTask = _scheduler.schedule(this, _purgeDelay, TimeUnit.MILLISECONDS);
164 }
165 }
166 }
167
168
169
170
171
172 public MongoSessionIdManager(Server server) throws UnknownHostException, MongoException
173 {
174 this(server, new Mongo().getDB("HttpSessions").getCollection("sessions"));
175 }
176
177
178 public MongoSessionIdManager(Server server, DBCollection sessions)
179 {
180 super(new Random());
181
182 _server = server;
183 _sessions = sessions;
184
185 _sessions.ensureIndex(
186 BasicDBObjectBuilder.start().add("id",1).get(),
187 BasicDBObjectBuilder.start().add("unique",true).add("sparse",false).get());
188 _sessions.ensureIndex(
189 BasicDBObjectBuilder.start().add("id",1).add("version",1).get(),
190 BasicDBObjectBuilder.start().add("unique",true).add("sparse",false).get());
191
192
193
194
195 _sessions.ensureIndex(
196 BasicDBObjectBuilder.start().add(MongoSessionManager.__VALID, 1).add(MongoSessionManager.__ACCESSED, 1).get(),
197 BasicDBObjectBuilder.start().add("sparse", false).add("background", true).get());
198 }
199
200
201
202
203
204
205
206 protected void scavenge()
207 {
208 long now = System.currentTimeMillis();
209 __log.debug("SessionIdManager:scavenge:at {}", now);
210
211
212
213
214
215
216
217
218
219 Set<String> block = new HashSet<String>();
220
221 Iterator<String> itor = _sessionsIds.iterator();
222 while (itor.hasNext())
223 {
224 block.add(itor.next());
225 if ((_scavengeBlockSize > 0) && (block.size() == _scavengeBlockSize))
226 {
227
228 scavengeBlock (now, block);
229
230 block.clear();
231 }
232 }
233
234
235 if (!block.isEmpty())
236 scavengeBlock(now, block);
237 }
238
239
240
241
242
243
244
245
246
247 protected void scavengeBlock (long atTime, Set<String> ids)
248 {
249 if (ids == null)
250 return;
251
252 BasicDBObject query = new BasicDBObject();
253 query.put(MongoSessionManager.__ID,new BasicDBObject("$in", ids ));
254 query.put(MongoSessionManager.__EXPIRY, new BasicDBObject("$gt", 0));
255 query.put(MongoSessionManager.__EXPIRY, new BasicDBObject("$lt", atTime));
256
257 DBCursor checkSessions = _sessions.find(query, new BasicDBObject(MongoSessionManager.__ID, 1));
258
259 for ( DBObject session : checkSessions )
260 {
261 __log.debug("SessionIdManager:scavenge: expiring session {}", (String)session.get(MongoSessionManager.__ID));
262 expireAll((String)session.get(MongoSessionManager.__ID));
263 }
264 }
265
266
267
268
269
270
271
272
273 protected void scavengeFully()
274 {
275 __log.debug("SessionIdManager:scavengeFully");
276
277 DBCursor checkSessions = _sessions.find();
278
279 for (DBObject session : checkSessions)
280 {
281 expireAll((String)session.get(MongoSessionManager.__ID));
282 }
283
284 }
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 protected void purge()
305 {
306 __log.debug("PURGING");
307 BasicDBObject invalidQuery = new BasicDBObject();
308
309 invalidQuery.put(MongoSessionManager.__VALID, false);
310 invalidQuery.put(MongoSessionManager.__ACCESSED, new BasicDBObject("$lt",System.currentTimeMillis() - _purgeInvalidAge));
311
312 DBCursor oldSessions = _sessions.find(invalidQuery, new BasicDBObject(MongoSessionManager.__ID, 1));
313
314 if (_purgeLimit > 0)
315 {
316 oldSessions.limit(_purgeLimit);
317 }
318
319 for (DBObject session : oldSessions)
320 {
321 String id = (String)session.get("id");
322
323 __log.debug("MongoSessionIdManager:purging invalid session {}", id);
324
325 _sessions.remove(session);
326 }
327
328 if (_purgeValidAge != 0)
329 {
330 BasicDBObject validQuery = new BasicDBObject();
331
332 validQuery.put(MongoSessionManager.__VALID, true);
333 validQuery.put(MongoSessionManager.__ACCESSED,new BasicDBObject("$lt",System.currentTimeMillis() - _purgeValidAge));
334
335 oldSessions = _sessions.find(validQuery,new BasicDBObject(MongoSessionManager.__ID,1));
336
337 if (_purgeLimit > 0)
338 {
339 oldSessions.limit(_purgeLimit);
340 }
341
342 for (DBObject session : oldSessions)
343 {
344 String id = (String)session.get(MongoSessionManager.__ID);
345
346 __log.debug("MongoSessionIdManager:purging valid session {}", id);
347
348 _sessions.remove(session);
349 }
350 }
351
352 }
353
354
355
356
357
358
359
360 protected void purgeFully()
361 {
362 BasicDBObject invalidQuery = new BasicDBObject();
363 invalidQuery.put(MongoSessionManager.__VALID, false);
364
365 DBCursor oldSessions = _sessions.find(invalidQuery, new BasicDBObject(MongoSessionManager.__ID, 1));
366
367 for (DBObject session : oldSessions)
368 {
369 String id = (String)session.get(MongoSessionManager.__ID);
370
371 __log.debug("MongoSessionIdManager:purging invalid session {}", id);
372
373 _sessions.remove(session);
374 }
375
376 }
377
378
379
380 public DBCollection getSessions()
381 {
382 return _sessions;
383 }
384
385
386
387 public boolean isPurgeEnabled()
388 {
389 return _purge;
390 }
391
392
393 public void setPurge(boolean purge)
394 {
395 this._purge = purge;
396 }
397
398
399
400
401
402
403
404
405 public void setScavengePeriod(long scavengePeriod)
406 {
407 if (scavengePeriod <= 0)
408 _scavengePeriod = __defaultScavengePeriod;
409 else
410 _scavengePeriod = TimeUnit.SECONDS.toMillis(scavengePeriod);
411 }
412
413
414
415
416
417
418 public void setScavengeBlockSize (int size)
419 {
420 _scavengeBlockSize = size;
421 }
422
423
424 public int getScavengeBlockSize ()
425 {
426 return _scavengeBlockSize;
427 }
428
429
430
431
432
433
434
435
436 public void setPurgeLimit(int purgeLimit)
437 {
438 _purgeLimit = purgeLimit;
439 }
440
441
442 public int getPurgeLimit()
443 {
444 return _purgeLimit;
445 }
446
447
448
449
450 public void setPurgeDelay(long purgeDelay)
451 {
452 if ( isRunning() )
453 {
454 throw new IllegalStateException();
455 }
456
457 this._purgeDelay = purgeDelay;
458 }
459
460
461 public long getPurgeInvalidAge()
462 {
463 return _purgeInvalidAge;
464 }
465
466
467
468
469
470
471
472 public void setPurgeInvalidAge(long purgeValidAge)
473 {
474 this._purgeInvalidAge = purgeValidAge;
475 }
476
477
478 public long getPurgeValidAge()
479 {
480 return _purgeValidAge;
481 }
482
483
484
485
486
487
488
489
490
491 public void setPurgeValidAge(long purgeValidAge)
492 {
493 this._purgeValidAge = purgeValidAge;
494 }
495
496
497 @Override
498 protected void doStart() throws Exception
499 {
500 __log.debug("MongoSessionIdManager:starting");
501
502
503 synchronized (this)
504 {
505
506 _scheduler =_server.getBean(Scheduler.class);
507 if (_scheduler == null)
508 {
509 _scheduler = new ScheduledExecutorScheduler();
510 _ownScheduler = true;
511 _scheduler.start();
512 }
513 else if (!_scheduler.isStarted())
514 throw new IllegalStateException("Shared scheduler not started");
515
516
517
518 if (_scavengePeriod > 0)
519 {
520 if (_scavengerTask != null)
521 {
522 _scavengerTask.cancel();
523 _scavengerTask = null;
524 }
525
526 _scavengerTask = _scheduler.schedule(new Scavenger(), _scavengePeriod, TimeUnit.MILLISECONDS);
527 }
528 else if (__log.isDebugEnabled())
529 __log.debug("Scavenger disabled");
530
531
532
533 if ( _purge )
534 {
535 if (_purgerTask != null)
536 {
537 _purgerTask.cancel();
538 _purgerTask = null;
539 }
540 _purgerTask = _scheduler.schedule(new Purger(), _purgeDelay, TimeUnit.MILLISECONDS);
541 }
542 else if (__log.isDebugEnabled())
543 __log.debug("Purger disabled");
544 }
545 }
546
547
548 @Override
549 protected void doStop() throws Exception
550 {
551 synchronized (this)
552 {
553 if (_scavengerTask != null)
554 {
555 _scavengerTask.cancel();
556 _scavengerTask = null;
557 }
558
559 if (_purgerTask != null)
560 {
561 _purgerTask.cancel();
562 _purgerTask = null;
563 }
564
565 if (_ownScheduler && _scheduler != null)
566 {
567 _scheduler.stop();
568 _scheduler = null;
569 }
570 }
571 super.doStop();
572 }
573
574
575
576
577
578 @Override
579 public boolean idInUse(String sessionId)
580 {
581
582
583
584 DBObject o = _sessions.findOne(new BasicDBObject("id",sessionId), __valid_true);
585
586 if ( o != null )
587 {
588 Boolean valid = (Boolean)o.get(MongoSessionManager.__VALID);
589 if ( valid == null )
590 {
591 return false;
592 }
593
594 return valid;
595 }
596
597 return false;
598 }
599
600
601 @Override
602 public void addSession(HttpSession session)
603 {
604 if (session == null)
605 {
606 return;
607 }
608
609
610
611
612
613 __log.debug("MongoSessionIdManager:addSession {}", session.getId());
614
615 _sessionsIds.add(session.getId());
616
617 }
618
619
620 @Override
621 public void removeSession(HttpSession session)
622 {
623 if (session == null)
624 {
625 return;
626 }
627
628 _sessionsIds.remove(session.getId());
629 }
630
631
632
633
634
635
636
637 @Override
638 public void invalidateAll(String sessionId)
639 {
640 _sessionsIds.remove(sessionId);
641
642
643
644 Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class);
645 for (int i=0; contexts!=null && i<contexts.length; i++)
646 {
647 SessionHandler sessionHandler = ((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class);
648 if (sessionHandler != null)
649 {
650 SessionManager manager = sessionHandler.getSessionManager();
651
652 if (manager != null && manager instanceof MongoSessionManager)
653 {
654 ((MongoSessionManager)manager).invalidateSession(sessionId);
655 }
656 }
657 }
658 }
659
660
661
662
663
664
665
666
667 public void expireAll (String sessionId)
668 {
669 _sessionsIds.remove(sessionId);
670
671
672
673
674 Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class);
675 for (int i=0; contexts!=null && i<contexts.length; i++)
676 {
677 SessionHandler sessionHandler = ((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class);
678 if (sessionHandler != null)
679 {
680 SessionManager manager = sessionHandler.getSessionManager();
681
682 if (manager != null && manager instanceof MongoSessionManager)
683 {
684 ((MongoSessionManager)manager).expire(sessionId);
685 }
686 }
687 }
688 }
689
690
691 @Override
692 public void renewSessionId(String oldClusterId, String oldNodeId, HttpServletRequest request)
693 {
694
695 String newClusterId = newSessionId(request.hashCode());
696
697 _sessionsIds.remove(oldClusterId);
698 _sessionsIds.add(newClusterId);
699
700
701 Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class);
702 for (int i=0; contexts!=null && i<contexts.length; i++)
703 {
704 SessionHandler sessionHandler = ((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class);
705 if (sessionHandler != null)
706 {
707 SessionManager manager = sessionHandler.getSessionManager();
708
709 if (manager != null && manager instanceof MongoSessionManager)
710 {
711 ((MongoSessionManager)manager).renewSessionId(oldClusterId, oldNodeId, newClusterId, getNodeId(newClusterId, request));
712 }
713 }
714 }
715 }
716
717 }