View Javadoc
1   /*
2    * Copyright (c) 2012-2024, jcabi.com
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met: 1) Redistributions of source code must retain the above
8    * copyright notice, this list of conditions and the following
9    * disclaimer. 2) Redistributions in binary form must reproduce the above
10   * copyright notice, this list of conditions and the following
11   * disclaimer in the documentation and/or other materials provided
12   * with the distribution. 3) Neither the name of the jcabi.com nor
13   * the names of its contributors may be used to endorse or promote
14   * products derived from this software without specific prior written
15   * permission.
16   *
17   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18   * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
19   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
20   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21   * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28   * OF THE POSSIBILITY OF SUCH DAMAGE.
29   */
30  package com.jcabi.aspects.aj;
31  
32  import com.jcabi.aspects.Immutable;
33  import com.jcabi.aspects.Parallel;
34  import com.jcabi.log.VerboseThreads;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.LinkedList;
38  import java.util.concurrent.Callable;
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.Future;
44  import org.aspectj.lang.ProceedingJoinPoint;
45  import org.aspectj.lang.annotation.Around;
46  import org.aspectj.lang.annotation.Aspect;
47  import org.aspectj.lang.reflect.MethodSignature;
48  
49  /**
50   * Execute method in multiple threads.
51   *
52   * @since 0.10
53   * @see Parallel
54   * @checkstyle NonStaticMethodCheck (100 lines)
55   */
56  @Aspect
57  @Immutable
58  public final class Parallelizer {
59  
60      /**
61       * Execute method in multiple threads.
62       *
63       * <p>This aspect should be used only on void returning methods.
64       *
65       * <p>Try NOT to change the signature of this method, in order to keep
66       * it backward compatible.
67       *
68       * @param point Joint point
69       * @return The result of call
70       * @throws Parallelizer.ParallelException If something goes wrong inside
71       * @checkstyle IllegalThrowsCheck (4 lines)
72       */
73      @Around("execution(@com.jcabi.aspects.Parallel * * (..))")
74      public Object wrap(final ProceedingJoinPoint point)
75          throws Parallelizer.ParallelException {
76          final int total = ((MethodSignature) point.getSignature())
77              .getMethod().getAnnotation(Parallel.class).threads();
78          final Collection<Callable<Throwable>> callables =
79              new ArrayList<>(total);
80          final CountDownLatch start = new CountDownLatch(1);
81          for (int thread = 0; thread < total; ++thread) {
82              callables.add(Parallelizer.callable(point, start));
83          }
84          final ExecutorService executor = Executors
85              .newFixedThreadPool(total, new VerboseThreads());
86          final Collection<Future<Throwable>> futures =
87              new ArrayList<>(total);
88          for (final Callable<Throwable> callable : callables) {
89              futures.add(executor.submit(callable));
90          }
91          start.countDown();
92          final Collection<Throwable> failures = new LinkedList<>();
93          for (final Future<Throwable> future : futures) {
94              Parallelizer.process(failures, future);
95          }
96          executor.shutdown();
97          if (!failures.isEmpty()) {
98              throw Parallelizer.exceptions(failures);
99          }
100         return null;
101     }
102 
103     /**
104      * Process futures.
105      * @param failures Collection of failures.
106      * @param future Future tu process.
107      */
108     private static void process(final Collection<Throwable> failures,
109         final Future<Throwable> future) {
110         final Throwable exception;
111         try {
112             exception = future.get();
113             if (exception != null) {
114                 failures.add(exception);
115             }
116         } catch (final InterruptedException ex) {
117             Thread.currentThread().interrupt();
118             failures.add(ex);
119         } catch (final ExecutionException ex) {
120             failures.add(ex);
121         }
122     }
123 
124     /**
125      * Create parallel exception.
126      * @param failures List of exceptions from threads.
127      * @return Aggregated exceptions.
128      */
129     @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
130     private static Parallelizer.ParallelException exceptions(
131         final Iterable<Throwable> failures) {
132         Parallelizer.ParallelException current = null;
133         for (final Throwable failure : failures) {
134             current = new Parallelizer.ParallelException(failure, current);
135         }
136         return current;
137     }
138 
139     /**
140      * Create callable that executes join point.
141      * @param point Join point to use.
142      * @param start Latch to use.
143      * @return Created callable.
144      */
145     @SuppressWarnings("PMD.AvoidCatchingThrowable")
146     private static Callable<Throwable> callable(final ProceedingJoinPoint point,
147         final CountDownLatch start) {
148         return () -> {
149             Throwable result = null;
150             try {
151                 start.await();
152                 point.proceed();
153                 // @checkstyle IllegalCatchCheck (1 line)
154             } catch (final Throwable ex) {
155                 result = ex;
156             }
157             return result;
158         };
159     }
160 
161     /**
162      * Exception that encapsulates all exceptions thrown from threads.
163      * @since 0.0.0
164      */
165     private static final class ParallelException extends Exception {
166 
167         /**
168          * Serialization marker.
169          */
170         private static final long serialVersionUID = 0x8743ef363febc422L;
171 
172         /**
173          * Next parallel exception.
174          */
175         private final transient Parallelizer.ParallelException next;
176 
177         /**
178          * Constructor.
179          * @param cause Cause of the current exception.
180          * @param nxt Following exception.
181          */
182         protected ParallelException(final Throwable cause,
183             final Parallelizer.ParallelException nxt) {
184             super(cause);
185             this.next = nxt;
186         }
187 
188         /**
189          * Get next parallel exception.
190          * @return Next exception.
191          */
192         public Parallelizer.ParallelException getNext() {
193             return this.next;
194         }
195     }
196 
197 }