1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util.thread;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25
26 import org.eclipse.jetty.util.component.AbstractLifeCycle;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class Sweeper extends AbstractLifeCycle implements Runnable
74 {
75 private static final Logger LOG = Log.getLogger(Sweeper.class);
76
77 private final AtomicReference<List<Sweepable>> items = new AtomicReference<>();
78 private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
79 private final Scheduler scheduler;
80 private final long period;
81
82 public Sweeper(Scheduler scheduler, long period)
83 {
84 this.scheduler = scheduler;
85 this.period = period;
86 }
87
88 @Override
89 protected void doStart() throws Exception
90 {
91 super.doStart();
92 items.set(new CopyOnWriteArrayList<Sweepable>());
93 activate();
94 }
95
96 @Override
97 protected void doStop() throws Exception
98 {
99 deactivate();
100 items.set(null);
101 super.doStop();
102 }
103
104 public int getSize()
105 {
106 List<Sweepable> refs = items.get();
107 return refs == null ? 0 : refs.size();
108 }
109
110 public boolean offer(Sweepable sweepable)
111 {
112 List<Sweepable> refs = items.get();
113 if (refs == null)
114 return false;
115 refs.add(sweepable);
116 if (LOG.isDebugEnabled())
117 LOG.debug("Resource offered {}", sweepable);
118 return true;
119 }
120
121 public boolean remove(Sweepable sweepable)
122 {
123 List<Sweepable> refs = items.get();
124 return refs != null && refs.remove(sweepable);
125 }
126
127 @Override
128 public void run()
129 {
130 List<Sweepable> refs = items.get();
131 if (refs == null)
132 return;
133 for (Sweepable sweepable : refs)
134 {
135 try
136 {
137 if (sweepable.sweep())
138 {
139 refs.remove(sweepable);
140 if (LOG.isDebugEnabled())
141 LOG.debug("Resource swept {}", sweepable);
142 }
143 }
144 catch (Throwable x)
145 {
146 LOG.info("Exception while sweeping " + sweepable, x);
147 }
148 }
149 activate();
150 }
151
152 private void activate()
153 {
154 if (isRunning())
155 {
156 Scheduler.Task t = scheduler.schedule(this, period, TimeUnit.MILLISECONDS);
157 if (LOG.isDebugEnabled())
158 LOG.debug("Scheduled in {} ms sweep task {}", period, t);
159 task.set(t);
160 }
161 else
162 {
163 if (LOG.isDebugEnabled())
164 LOG.debug("Skipping sweep task scheduling");
165 }
166 }
167
168 private void deactivate()
169 {
170 Scheduler.Task t = task.getAndSet(null);
171 if (t != null)
172 {
173 boolean cancelled = t.cancel();
174 if (LOG.isDebugEnabled())
175 LOG.debug("Cancelled ({}) sweep task {}", cancelled, t);
176 }
177 }
178
179
180
181
182
183
184
185
186
187 public interface Sweepable
188 {
189
190
191
192 public boolean sweep();
193 }
194 }