1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 package com.jcabi.aspects.aj;
31
32 import com.jcabi.aspects.ScheduleWithFixedDelay;
33 import com.jcabi.log.Logger;
34 import com.jcabi.log.VerboseRunnable;
35 import com.jcabi.log.VerboseThreads;
36 import java.io.Closeable;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ConcurrentMap;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44 import org.aspectj.lang.JoinPoint;
45 import org.aspectj.lang.annotation.After;
46 import org.aspectj.lang.annotation.Aspect;
47 import org.aspectj.lang.annotation.Before;
48
49
50
51
52
53
54 @Aspect
55 @SuppressWarnings("PMD.DoNotUseThreads")
56 public final class MethodScheduler {
57
58
59
60
61 private final transient
62 ConcurrentMap<Object, MethodScheduler.Service> services;
63
64
65
66
67 public MethodScheduler() {
68 this.services =
69 new ConcurrentHashMap<>(0);
70 }
71
72
73
74
75
76
77
78
79
80
81 @After("initialization((@com.jcabi.aspects.ScheduleWithFixedDelay *).new(..))")
82 public void instantiate(final JoinPoint point) {
83 final Object object = point.getTarget();
84 if (this.services.containsKey(object)) {
85 throw new IllegalStateException(
86 Logger.format(
87 "%[type]s was already scheduled for execution",
88 object
89 )
90 );
91 }
92 final Runnable runnable;
93 if (object instanceof Runnable) {
94 runnable = new VerboseRunnable((Runnable) object, true);
95 } else if (object instanceof Callable) {
96 runnable = new VerboseRunnable((Callable<?>) object, true);
97 } else {
98 throw new IllegalStateException(
99 Logger.format(
100 "%[type]s doesn't implement Runnable or Callable",
101 object
102 )
103 );
104 }
105 this.services.put(
106 object,
107 new MethodScheduler.Service(
108 runnable,
109 object,
110 object.getClass().getAnnotation(ScheduleWithFixedDelay.class)
111 )
112 );
113 }
114
115
116
117
118
119
120
121
122
123
124 @Before("execution(* (@com.jcabi.aspects.ScheduleWithFixedDelay *).close())")
125 public void close(final JoinPoint point) {
126 final Object object = point.getTarget();
127 this.services.get(object).close();
128 this.services.remove(object);
129 }
130
131
132
133
134
135 private static final class Service implements Closeable {
136
137
138
139
140 private final transient ScheduledExecutorService executor;
141
142
143
144
145 private final transient Object object;
146
147
148
149
150 private final transient AtomicLong counter;
151
152
153
154
155 private final transient long start;
156
157
158
159
160 private final transient long await;
161
162
163
164
165 private final transient long attempts;
166
167
168
169
170 private final transient boolean verbose;
171
172
173
174
175
176
177
178 @SuppressWarnings("PMD.ConstructorOnlyInitializesOrCallOtherConstructors")
179 Service(final Runnable runnable, final Object obj,
180 final ScheduleWithFixedDelay annt) {
181 this.start = System.currentTimeMillis();
182 this.counter = new AtomicLong();
183 this.object = obj;
184 this.executor = Executors.newScheduledThreadPool(
185 annt.threads(),
186 new VerboseThreads(this.object)
187 );
188 this.verbose = annt.verbose();
189 this.await = annt.awaitUnit().toMillis(
190 (long) annt.await()
191 );
192 this.attempts = (long) annt.shutdownAttempts();
193 final Runnable job = () -> {
194 runnable.run();
195 this.counter.incrementAndGet();
196 };
197 for (int thread = 0; thread < annt.threads(); ++thread) {
198 this.executor.scheduleWithFixedDelay(
199 job, (long) annt.delay(), (long) annt.delay(),
200 annt.unit()
201 );
202 }
203 if (this.verbose) {
204 Logger.info(
205 this.object,
206 "scheduled for execution with %d %s interval",
207 annt.delay(),
208 annt.unit()
209 );
210 }
211 }
212
213 @Override
214 public void close() {
215 this.executor.shutdown();
216 final long begin = System.currentTimeMillis();
217 try {
218 while (true) {
219 if (this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
220 break;
221 }
222 final long age = System.currentTimeMillis() - begin;
223 if (age > this.await) {
224 break;
225 }
226 if (this.verbose) {
227 Logger.info(
228 this, "waiting %[ms]s for threads termination", age
229 );
230 }
231 }
232 for (int attempt = 0; (long) attempt < this.attempts; ++attempt) {
233 this.executor.shutdownNow();
234 this.executor.awaitTermination(1L, TimeUnit.SECONDS);
235 }
236 if (!this.executor.isTerminated()) {
237 throw new IllegalStateException(
238 Logger.format(
239 "failed to shutdown %[type]s of %[type]s",
240 this.executor,
241 this.object
242 )
243 );
244 }
245 } catch (final InterruptedException ex) {
246 Thread.currentThread().interrupt();
247 throw new IllegalStateException(ex);
248 }
249 if (this.verbose && Logger.isInfoEnabled(this.object)) {
250 Logger.info(
251 this.object,
252 "execution stopped after %[ms]s and %d tick(s)",
253 System.currentTimeMillis() - this.start,
254 this.counter.get()
255 );
256 }
257 }
258 }
259
260 }