View Javadoc
1   /*
2    * Copyright (c) 2012-2024, jcabi.com
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met: 1) Redistributions of source code must retain the above
8    * copyright notice, this list of conditions and the following
9    * disclaimer. 2) Redistributions in binary form must reproduce the above
10   * copyright notice, this list of conditions and the following
11   * disclaimer in the documentation and/or other materials provided
12   * with the distribution. 3) Neither the name of the jcabi.com nor
13   * the names of its contributors may be used to endorse or promote
14   * products derived from this software without specific prior written
15   * permission.
16   *
17   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18   * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
19   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
20   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21   * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28   * OF THE POSSIBILITY OF SUCH DAMAGE.
29   */
30  package com.jcabi.aspects.aj;
31  
32  import com.jcabi.aspects.ScheduleWithFixedDelay;
33  import com.jcabi.log.Logger;
34  import com.jcabi.log.VerboseRunnable;
35  import com.jcabi.log.VerboseThreads;
36  import java.io.Closeable;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ConcurrentMap;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.ScheduledExecutorService;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicLong;
44  import org.aspectj.lang.JoinPoint;
45  import org.aspectj.lang.annotation.After;
46  import org.aspectj.lang.annotation.Aspect;
47  import org.aspectj.lang.annotation.Before;
48  
49  /**
50   * Schedules methods.
51   *
52   * @since 0.7.16
53   */
54  @Aspect
55  @SuppressWarnings("PMD.DoNotUseThreads")
56  public final class MethodScheduler {
57  
58      /**
59       * Objects and their running services.
60       */
61      private final transient
62          ConcurrentMap<Object, MethodScheduler.Service> services;
63  
64      /**
65       * Ctor.
66       */
67      public MethodScheduler() {
68          this.services =
69              new ConcurrentHashMap<>(0);
70      }
71  
72      /**
73       * Instantiate a new routine task.
74       *
75       * <p>Try NOT to change the signature of this method, in order to keep
76       * it backward compatible.
77       *
78       * @param point Joint point
79       * @checkstyle LineLength (2 lines)
80       */
81      @After("initialization((@com.jcabi.aspects.ScheduleWithFixedDelay *).new(..))")
82      public void instantiate(final JoinPoint point) {
83          final Object object = point.getTarget();
84          if (this.services.containsKey(object)) {
85              throw new IllegalStateException(
86                  Logger.format(
87                      "%[type]s was already scheduled for execution",
88                      object
89                  )
90              );
91          }
92          final Runnable runnable;
93          if (object instanceof Runnable) {
94              runnable = new VerboseRunnable((Runnable) object, true);
95          } else if (object instanceof Callable) {
96              runnable = new VerboseRunnable((Callable<?>) object, true);
97          } else {
98              throw new IllegalStateException(
99                  Logger.format(
100                     "%[type]s doesn't implement Runnable or Callable",
101                     object
102                 )
103             );
104         }
105         this.services.put(
106             object,
107             new MethodScheduler.Service(
108                 runnable,
109                 object,
110                 object.getClass().getAnnotation(ScheduleWithFixedDelay.class)
111             )
112         );
113     }
114 
115     /**
116      * Stop/close a routine task.
117      *
118      * <p>Try NOT to change the signature of this method, in order to keep
119      * it backward compatible.
120      *
121      * @param point Joint point
122      * @checkstyle LineLength (2 lines)
123      */
124     @Before("execution(* (@com.jcabi.aspects.ScheduleWithFixedDelay *).close())")
125     public void close(final JoinPoint point) {
126         final Object object = point.getTarget();
127         this.services.get(object).close();
128         this.services.remove(object);
129     }
130 
131     /**
132      * Running service.
133      * @since 0.0.0
134      */
135     private static final class Service implements Closeable {
136 
137         /**
138          * Running scheduled service.
139          */
140         private final transient ScheduledExecutorService executor;
141 
142         /**
143          * The object.
144          */
145         private final transient Object object;
146 
147         /**
148          * Execution counter.
149          */
150         private final transient AtomicLong counter;
151 
152         /**
153          * When started.
154          */
155         private final transient long start;
156 
157         /**
158          * How long to wait for the task to finish.
159          */
160         private final transient long await;
161 
162         /**
163          * Shutdown attempts count.
164          */
165         private final transient long attempts;
166 
167         /**
168          * Should more information be logged?
169          */
170         private final transient boolean verbose;
171 
172         /**
173          * Public ctor.
174          * @param runnable The runnable to schedule
175          * @param obj Object
176          * @param annt Annotation
177          */
178         @SuppressWarnings("PMD.ConstructorOnlyInitializesOrCallOtherConstructors")
179         Service(final Runnable runnable, final Object obj,
180             final ScheduleWithFixedDelay annt) {
181             this.start = System.currentTimeMillis();
182             this.counter = new AtomicLong();
183             this.object = obj;
184             this.executor = Executors.newScheduledThreadPool(
185                 annt.threads(),
186                 new VerboseThreads(this.object)
187             );
188             this.verbose = annt.verbose();
189             this.await = annt.awaitUnit().toMillis(
190                 (long) annt.await()
191             );
192             this.attempts = (long) annt.shutdownAttempts();
193             final Runnable job = () -> {
194                 runnable.run();
195                 this.counter.incrementAndGet();
196             };
197             for (int thread = 0; thread < annt.threads(); ++thread) {
198                 this.executor.scheduleWithFixedDelay(
199                     job, (long) annt.delay(), (long) annt.delay(),
200                     annt.unit()
201                 );
202             }
203             if (this.verbose) {
204                 Logger.info(
205                     this.object,
206                     "scheduled for execution with %d %s interval",
207                     annt.delay(),
208                     annt.unit()
209                 );
210             }
211         }
212 
213         @Override
214         public void close() {
215             this.executor.shutdown();
216             final long begin = System.currentTimeMillis();
217             try {
218                 while (true) {
219                     if (this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
220                         break;
221                     }
222                     final long age = System.currentTimeMillis() - begin;
223                     if (age > this.await) {
224                         break;
225                     }
226                     if (this.verbose) {
227                         Logger.info(
228                             this, "waiting %[ms]s for threads termination", age
229                         );
230                     }
231                 }
232                 for (int attempt = 0; (long) attempt < this.attempts; ++attempt) {
233                     this.executor.shutdownNow();
234                     this.executor.awaitTermination(1L, TimeUnit.SECONDS);
235                 }
236                 if (!this.executor.isTerminated()) {
237                     throw new IllegalStateException(
238                         Logger.format(
239                             "failed to shutdown %[type]s of %[type]s",
240                             this.executor,
241                             this.object
242                         )
243                     );
244                 }
245             } catch (final InterruptedException ex) {
246                 Thread.currentThread().interrupt();
247                 throw new IllegalStateException(ex);
248             }
249             if (this.verbose && Logger.isInfoEnabled(this.object)) {
250                 Logger.info(
251                     this.object,
252                     "execution stopped after %[ms]s and %d tick(s)",
253                     System.currentTimeMillis() - this.start,
254                     this.counter.get()
255                 );
256             }
257         }
258     }
259 
260 }