001package com.identityworksllc.iiq.common.threads;
002
003import com.identityworksllc.iiq.common.TaskCallback;
004import com.identityworksllc.iiq.common.Utilities;
005import com.identityworksllc.iiq.common.vo.Outcome;
006import org.apache.commons.logging.Log;
007import org.apache.commons.logging.LogFactory;
008import sailpoint.api.SailPointContext;
009import sailpoint.object.Request;
010import sailpoint.object.RequestDefinition;
011import sailpoint.task.TaskMonitor;
012import sailpoint.tools.Compressor;
013import sailpoint.tools.GeneralException;
014import sailpoint.tools.Util;
015
016import java.io.IOException;
017import java.io.Serializable;
018import java.util.ArrayList;
019import java.util.Base64;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.StringJoiner;
025import java.util.UUID;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CancellationException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030import java.util.concurrent.FutureTask;
031import java.util.concurrent.RecursiveTask;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036
037/**
038 * A worker thread for multi-threaded actions. This class can be used with
039 * AbstractThreadedObjectIteratorTask to implement a multi-threaded task
040 * or with SailPointWorkerExecutor to distribute the workers across
041 * the cluster.
042 *
043 * If intended for use with a Request, all parts of the subclass, including
044 * objects stored in lists, maps, and other structures, must be either Serializable
045 * or Externalizable.
046 */
047public abstract class SailPointWorker implements Runnable, Serializable {
048
049        /**
050         * An interface used as an error callback, specifically for use via the SailPointWorkerExecutor
051         * but potentially usable by testing code as well.
052         */
053        @FunctionalInterface
054        public interface ExceptionHandler {
055                void handleError(Throwable t);
056        }
057
058        /**
059         * The attribute used to pass this object in serialized form to {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}.
060         */
061        public static final String MULTI_SERIALIZED_WORKERS_ATTR = "serializedWorkers";
062
063        /**
064         * Generated serialVersionUID
065         */
066        private static final long serialVersionUID = 3L;
067
068        /**
069         * Submits the task to the given executor and returns its future. If any listeners
070         * are passed, the given task's Future will be registered with the listeners as a
071         * dependency. The listeners will not be able to run until the Future resolves.
072         *
073         * @param executor The executor to submit the task to
074         * @param self The task to submit
075         * @param listeners Any other workers that care about the output of this one
076         * @return The Future for this task
077         */
078        public static Future<?> submitWithListeners(ExecutorService executor, SailPointWorker self, SailPointWorker... listeners) {
079                Future<?> future = executor.submit(self.toCallable());
080
081                for(SailPointWorker listener : listeners) {
082                        listener.addDependency(self, future);
083                }
084
085                return future;
086        }
087
088        /**
089         * Serializes a list of SailPointWorker object into a Request suitable for use with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}. That executor must be associated with the provided RequestDefinition.
090         * @param requestDefinition The request definition associated with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}
091         * @param workers A list of SailPointWorkers to pass to the request handler
092         * @return The Request containing a serialized object
093         * @throws GeneralException if any failures occur compressing the object
094         * @throws IOException if any failures occur serializing this {@link SailPointWorker}
095         */
096        public static Request toRequest(RequestDefinition requestDefinition, List<SailPointWorker> workers) throws GeneralException, IOException {
097                Request request = new Request();
098                request.setName("Batch of " + workers.size() + " workers");
099                request.setDefinition(requestDefinition);
100                List<String> workerStrings = new ArrayList<>();
101                int phase = 0;
102                int dependentPhase = 0;
103                for(SailPointWorker worker : workers) {
104                        byte[] contents = Util.serializableObjectToBytes(worker);
105                        String compressed = Compressor.compress(Base64.getEncoder().encodeToString(contents));
106                        workerStrings.add(compressed);
107                        if (phase == 0 && worker.phase > 0) {
108                                phase = worker.phase;
109                        }
110                        if (dependentPhase == 0 && worker.dependentPhase > 0) {
111                                dependentPhase = worker.dependentPhase;
112                        }
113                }
114                request.setAttribute(MULTI_SERIALIZED_WORKERS_ATTR, workerStrings);
115                if (phase > 0) {
116                        request.setPhase(phase);
117                }
118                if (dependentPhase > 0) {
119                        request.setDependentPhase(dependentPhase);
120                }
121                return request;
122        }
123        /**
124         * An optional list of Future objects that will be checked before the task is
125         * run. The resulting objects will be available via getDependencyOutput()
126         */
127        /*package*/ List<SailPointWorker> children;
128        /**
129         * An optional counter object to increment on a successful execution
130         */
131        private transient AtomicInteger completedCounter;
132        /**
133         * The default name of this task
134         */
135        private final String defaultName;
136        /**
137         * A list of registered dependencies by name
138         */
139        /*package*/ transient Map<String, Future<?>> dependencies;
140
141        /**
142         * An optional list of Future objects that will be checked before the task is
143         * run. The resulting objects will be available via getDependencyOutput()
144         */
145        /*package*/ transient Map<String, Object> dependencyOutput;
146
147        /**
148         * The dependent for this worker, if this is used as a partition
149         */
150        private int dependentPhase;
151        /**
152         * The optional exception handler for this task
153         */
154        private transient ExceptionHandler exceptionHandler;
155
156        /**
157         * A flag indicating that we ought to execute the children prior to this one.
158         * This defaults to true, but will be altered by {@link RecursiveWorkerChildTask}.
159         */
160        /*package*/ boolean executeChildren;
161
162        /**
163         * An optional counter object to increment on failures
164         */
165        private transient AtomicInteger failedCounter;
166        /**
167         * An instance of the TaskMonitor interface
168         */
169        protected transient TaskMonitor monitor;
170
171        /**
172         * An Outcome object allowing subclasses to provide output
173         */
174        protected transient Outcome outcome;
175
176        /**
177         * The parent task of this one, used to propagate events
178         */
179        private SailPointWorker parent;
180
181        /**
182         * The phase this worker is in, if used as a partition
183         */
184        private int phase;
185
186        /**
187         * Task callback object to be invoked
188         */
189        private List<TaskCallback<SailPointWorker, Object>> taskCallback;
190
191        /**
192         * A flag accessible to subclasses indicating that this task has been terminated
193         */
194        private transient AtomicBoolean terminated;
195
196        /**
197         * The amount of time to be spent before timing out
198         */
199        private long timeoutMillis;
200
201        /**
202         * The timestamp at which to mark this task as timed out
203         */
204        private long timeoutTimestamp;
205
206        /**
207         * Constructs a new Worker with the default name (the class + UUID)
208         */
209        protected SailPointWorker() {
210                this(null);
211        }
212
213        /**
214         * A worker constructor that takes a name
215         * @param name The worker name
216         */
217        protected SailPointWorker(String name) {
218                outcome = new Outcome();
219                terminated = new AtomicBoolean();
220                if (Util.isNotNullOrEmpty(name)) {
221                        this.defaultName = name;
222                } else {
223                        this.defaultName = this.getClass().getSimpleName() + " " + UUID.randomUUID();
224                }
225                children = new ArrayList<>();
226                dependencyOutput = new HashMap<>();
227                dependencies = new HashMap<>();
228                taskCallback = new ArrayList<>();
229                this.executeChildren = true;
230        }
231
232        /**
233         * A worker constructor that takes a name
234         * @param name The worker name
235         * @param phase The worker phase
236         */
237        protected SailPointWorker(String name, int phase) {
238                this(name);
239                this.phase = phase;
240        }
241
242        /**
243         * A worker constructor that takes a name
244         * @param name The worker name
245         * @param phase The worker phase
246         */
247        protected SailPointWorker(String name, int phase, int dependentPhase) {
248                this(name, phase);
249                this.dependentPhase = dependentPhase;
250        }
251
252        /**
253         * Adds a child of this task. All child tasks will be resolved before this task runs
254         * and their output will be available to this one. This could be used to implement
255         * phased execution, for example.
256         *
257         * This is NOT the same as a dependency, which is an asynchronous Future that will
258         * block until completion.
259         *
260         * @param childWorker The worker to add as a child
261         */
262        public void addChild(SailPointWorker childWorker) {
263                childWorker.setParent(this);
264                this.children.add(childWorker);
265        }
266
267        /**
268         * Adds a dependency Future which will be invoked prior to any child tasks
269         * and also this task.
270         *
271         * @param dependency The dependency task (used to extract the name)
272         * @param workerFuture The future for that dependency
273         */
274        public void addDependency(SailPointWorker dependency, Future<?> workerFuture) {
275                this.dependencies.put(dependency.getWorkerName(), workerFuture);
276        }
277
278        /**
279         * Adds a task callback, which can be used to receive events
280         * @param taskCallback The task callback
281         */
282        public void addTaskCallback(TaskCallback<SailPointWorker, Object> taskCallback) {
283                this.taskCallback.add(taskCallback);
284        }
285
286        /**
287         * Checks for any of the abnormal termination states, throwing an
288         * {@link CancellationException} if one is encountered. Subclasses that
289         * are working in a loop should invoke this method routinely and allow
290         * the exception to abort whatever loop is being run.
291         *
292         * @throws CancellationException if this thread should abnormally terminate
293         */
294        protected void checkCancel() throws CancellationException {
295                if (isTerminated() || Thread.currentThread().isInterrupted()) {
296                        throw new CancellationException("Worker has been terminated");
297                }
298        }
299
300        /**
301         * Executes this task in a SailPoint context that will be dynamically constructed for
302         * it. A private context and {@link Log} will be passed to this method.
303         * @param context The private context to use for this thread worker
304         * @param logger The log attached to this Worker
305         * @return any object, which will be ignored
306         * @throws Exception any exception, which will be logged
307         */
308        public abstract Object execute(SailPointContext context, Log logger) throws Exception;
309
310        /**
311         * Retrieve the output of a dependency (listened-to) or child task of this task.
312         *
313         * @param key The name of the child worker
314         * @return The output
315         */
316        protected Object getDependencyOutput(String key) {
317                return this.dependencyOutput.get(key);
318        }
319
320        /**
321         * Returns the dependent phase of this worker
322         * @return The dependent phase
323         */
324        public int getDependentPhase() {
325                return dependentPhase;
326        }
327
328        /**
329         * Gets the exception handler for this worker
330         * @return The exception handler for this worker
331         */
332        public ExceptionHandler getExceptionHandler() {
333                return exceptionHandler;
334        }
335
336        /**
337         * Gets the monitor associated with this worker (if any)
338         * @return The monitor, which may be null
339         */
340        public TaskMonitor getMonitor() {
341                return monitor;
342        }
343
344        /**
345         * Retrieves the parent task of this one
346         * @return The parent task
347         */
348        public SailPointWorker getParent() {
349                return parent;
350        }
351
352        /**
353         * Returns the phase of this worker
354         * @return The phase of this worker
355         */
356        public int getPhase() {
357                return phase;
358        }
359
360        /**
361         * Returns the name of this worker, which can be used in log messages or as the
362         * name of a partitioned request object. By default, this is generated from the
363         * name of this concrete class (your subclass) and a random UUID.
364         *
365         * Override this method to provide your own more sensible naming scheme.
366         *
367         * @return The worker name
368         */
369        public String getWorkerName() {
370                return defaultName;
371        }
372
373        /**
374         * The core flow of a SailPointWorker. Renames the thread for logging
375         * clarity, resolves any dependencies, and handles success and failure
376         * outcomes by invoking the appropriate callbacks.
377         *
378         * @return The implementation
379         */
380        private Object implementation(boolean rethrow) throws Exception {
381                // The non-exceptional output of this task
382                AtomicReference<Object> result = new AtomicReference<>();
383
384                // The exceptional output of this task
385                AtomicReference<Exception> exceptionResult = new AtomicReference<>();
386
387                Log logger = LogFactory.getLog(this.getClass());
388                final String originalThreadName = Thread.currentThread().getName();
389                this.timeoutTimestamp = System.currentTimeMillis() + timeoutMillis;
390                try {
391                        if (taskCallback != null) {
392                                // Can't do this in a lambda forEach because this one can throw an exception
393                                for(TaskCallback<SailPointWorker, Object> tc : taskCallback) {
394                                        tc.beforeStart(this);
395                                }
396                        }
397                        Thread.currentThread().setName("SailPointWorker: " + getWorkerName());
398                        try {
399                                Utilities.withPrivateContext((threadContext) -> {
400                                        try {
401                                                waitForDependencies(logger);
402                                                if (this.executeChildren) {
403                                                        // Executes the child tasks, storing their outputs
404                                                        runChildren(this, logger, threadContext);
405                                                }
406                                                result.set(execute(threadContext, logger));
407                                                threadContext.commitTransaction();
408                                                if (completedCounter != null) {
409                                                        completedCounter.incrementAndGet();
410                                                }
411                                                if (taskCallback != null) {
412                                                        taskCallback.forEach((tc) -> tc.onSuccess(this, result.get()));
413                                                }
414                                        } catch (Exception e) {
415                                                if (failedCounter != null) {
416                                                        failedCounter.incrementAndGet();
417                                                }
418                                                if (exceptionHandler != null) {
419                                                        exceptionHandler.handleError(e);
420                                                }
421                                                if (taskCallback != null) {
422                                                        taskCallback.forEach((tc) -> tc.onFailure(this, e.getMessage(), e));
423                                                }
424                                                exceptionResult.set(e);
425                                        }
426                                });
427                        } finally {
428                                Thread.currentThread().setName(originalThreadName);
429                        }
430                } finally {
431                        if (taskCallback != null) {
432                                taskCallback.forEach((tc) -> tc.afterFinish(this));
433                        }
434                        if (this.parent != null && this.parent.dependencyOutput != null && this.dependencyOutput != null) {
435                                this.parent.dependencyOutput.putAll(this.dependencyOutput);
436                        }
437                }
438
439                if (rethrow && exceptionResult.get() != null) {
440                        throw exceptionResult.get();
441                }
442
443                return result.get();
444        }
445
446        /**
447         * Returns true if this worker has been terminated or if it has timed out.
448         */
449        public boolean isTerminated() {
450                return isTimedOut() || terminated.get();
451        }
452
453        /**
454         * Returns true if this task has timed out.
455         *
456         * @return True if this task has timed out.
457         */
458        public boolean isTimedOut() {
459                return (this.timeoutMillis > 0 && System.currentTimeMillis() < timeoutTimestamp);
460        }
461
462        /**
463         * Java deserialization hook to instantiate the transient fields
464         *
465         * @see Serializable
466         *
467         * @param in The input stream
468         * @throws IOException if anything fails
469         * @throws ClassNotFoundException if this is the wrong clas
470         */
471        private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
472                this.terminated = new AtomicBoolean();
473                this.outcome = new Outcome();
474                this.dependencyOutput = new HashMap<>();
475                this.dependencies = new HashMap<>();
476                in.defaultReadObject();
477        }
478
479        /**
480         * Invokes this SailPointWorker by constructing a new private SailPointContext,
481         * then invoking the subclass's {@link #execute(SailPointContext, Log)} method,
482         * then handling success or failure according to the optional objects.
483         *
484         * The subclass is responsible for logging and rethrowing any exceptions if
485         * a failure is to be counted.
486         */
487        @Override
488        public void run() {
489                try {
490                        implementation(false);
491                } catch(Exception ignored) {
492                        // Already handled due to 'false' above
493                }
494        }
495
496        /**
497         * Directly runs the child tasks for this task. If any child task throws an exception,
498         * or turns out to have timed out upon completion, an exception will be thrown from
499         * this method and no further child tasks will be executed.
500         *
501         * Each child's {@link #execute(SailPointContext, Log)} method is invoked directly.
502         * Children are run in the same SailPointContext as the parent.
503         *
504         * A tree of nested child tasks will be invoked in depth-first order.
505         *
506         * @param logger The logger
507         * @param context The private context created in {@link #implementation(boolean)}
508         * @throws Exception If a child throws an exception
509         */
510        private void runChildren(SailPointWorker parent, Log logger, SailPointContext context) throws Exception {
511                for(SailPointWorker child : Util.safeIterable(parent.children)) {
512                        child.timeoutTimestamp = System.currentTimeMillis() + child.timeoutMillis;
513
514                        if (!Util.isEmpty(child.children)) {
515                                runChildren(child, logger, context);
516                        }
517                        Object output = child.execute(context, logger);
518                        if (child.isTimedOut()) {
519                                throw new CancellationException("Child task " + child.getWorkerName() + " timed out");
520                        }
521                        parent.dependencyOutput.put(child.getWorkerName(), output);
522                }
523        }
524
525        /**
526         * Gets this object as a Runnable, mainly for use with an ExecutorService
527         * @return This object as a Runnable
528         */
529        public final Runnable runnable() {
530                return this;
531        }
532
533        /**
534         * Set the completed counter on this task and any of its children
535         * @param completedCounter The completed counter
536         */
537        public void setCompletedCounter(AtomicInteger completedCounter) {
538                this.completedCounter = completedCounter;
539
540                if (this.children != null) {
541                        for(SailPointWorker child : this.children) {
542                                child.setCompletedCounter(completedCounter);
543                        }
544                }
545        }
546
547        /**
548         * Set the dependent phase for this worker. This is only useful if running
549         * the worker as a partitioned Request.
550         *
551         * @param dependentPhase The completed counter
552         */
553        public void setDependentPhase(int dependentPhase) {
554                this.dependentPhase = dependentPhase;
555
556                // NOTE: Children don't need to copy the parent's phase because they run in the
557                // same thread as this one and thus will be in the same phase by default.
558        }
559
560        /**
561         * Sets the exception handler for this class.
562         * @param exceptionHandler The exception handler
563         */
564        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
565                this.exceptionHandler = exceptionHandler;
566
567                if (this.children != null) {
568                        for(SailPointWorker child : this.children) {
569                                child.setExceptionHandler(exceptionHandler);
570                        }
571                }
572        }
573
574        /**
575         * Sets the failure counter that will be incremented on any worker failure.
576         *
577         * @param failedCounter The failed counter
578         */
579        public void setFailedCounter(AtomicInteger failedCounter) {
580                this.failedCounter = failedCounter;
581
582                if (this.children != null) {
583                        for(SailPointWorker child : this.children) {
584                                child.setFailedCounter(failedCounter);
585                        }
586                }
587        }
588
589        /**
590         * Sets the TaskMonitor for this worker and its children. This will be set by
591         * the SPW request executor, among other places.
592         * @param monitor The task monitor
593         */
594        public void setMonitor(TaskMonitor monitor) {
595                this.monitor = monitor;
596
597                if (this.children != null) {
598                        for(SailPointWorker child : this.children) {
599                                child.setMonitor(monitor);
600                        }
601                }
602        }
603
604        /**
605         * Sets the parent task of this one to the given value. This can be null
606         * @param parent The parent task
607         */
608        protected void setParent(SailPointWorker parent) {
609                this.parent = parent;
610        }
611
612        /**
613         * Sets the timeout duration in the specified unit.
614         */
615        public void setTimeout(int duration, TimeUnit unit) {
616                this.timeoutMillis = TimeUnit.MILLISECONDS.convert(duration, unit);
617        }
618
619        /**
620         * Sets the phase for this worker. This is only useful if running as a {@link Request}.
621         * @param phase The phase number for this worker
622         */
623        public void setPhase(int phase) {
624                this.phase = phase;
625
626                // NOTE: Children don't need to copy the parent's phase because they run in the
627                // same thread as this one and thus will be in the same phase by default.
628        }
629
630        /**
631         * Attempts to terminate the worker
632         *
633         * @return True, only to satisfy the Terminable interface
634         */
635        public boolean terminate() {
636                this.terminated.set(true);
637
638                return true;
639        }
640
641        /**
642         * Returns a Callable object that will implement the logic of this SailPointWorker,
643         * properly returning a value or an exception for {@link java.util.concurrent.Future} purposes.
644         *
645         * This is used mainly because {@link java.util.concurrent.ExecutorService#submit(Runnable)}
646         * gets messy if the same object implements both Runnable and Callable. This must also be
647         * used if you want to chain workers using {@link java.util.concurrent.Future} and the
648         * dependency function.
649         */
650        public Callable<Object> toCallable() {
651                return () -> this.implementation(true);
652        }
653
654        /**
655         * Creates a new recursive task from this worker, for use with a ForkJoinPool
656         * and a tree of child worker tasks.
657         * @return The recursive task
658         */
659        public RecursiveTask<List<Object>> toForkJoinTask() {
660                return new RecursiveWorkerContainer(Collections.singletonList(this));
661        }
662
663        /**
664         * Creates a FutureTask wrapping this object as a Callable
665         * @return The FutureTask
666         */
667        public FutureTask<Object> toFutureTask() {
668                return new FutureTask<>(toCallable());
669        }
670
671        /**
672         * Serializes this SailPointWorker object into a Request suitable for use with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}. That executor must be associated with the provided RequestDefinition.
673         * @param requestDefinition The request definition associated with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}
674         * @return The Request containing a serialized object
675         * @throws GeneralException if any failures occur compressing the object
676         * @throws IOException if any failures occur serializing this {@link SailPointWorker}
677         */
678        public Request toRequest(RequestDefinition requestDefinition) throws GeneralException, IOException {
679                List<SailPointWorker> singletonList = new ArrayList<>();
680                singletonList.add(this);
681
682                return SailPointWorker.toRequest(requestDefinition, singletonList);
683        }
684
685        /**
686         * @see Object#toString()
687         */
688        @Override
689        public String toString() {
690                return new StringJoiner(", ", SailPointWorker.class.getSimpleName() + "[", "]")
691                                .add("name='" + getWorkerName() + "'")
692                                .add("dependencies='" + this.dependencies.keySet() + "'")
693                                .add("children='" + this.children + "'")
694                                .toString();
695        }
696
697        /**
698         * Resolves any dependencies of this task, noting errors
699         * @param logger The logger used to log dependency errors
700         * @throws GeneralException if any dependency fails
701         */
702        private void waitForDependencies(Log logger) throws GeneralException, InterruptedException {
703                boolean dependencyFailure = false;
704                if (dependencies != null) {
705                        for (String key : dependencies.keySet()) {
706                                long remainingTimeout = this.timeoutTimestamp - System.currentTimeMillis();
707                                if (isTimedOut()) {
708                                        throw new InterruptedException();
709                                }
710
711                                // Note that this will block until the Future resolves. It will consume
712                                // a thread in your thread pool, so you should probably use a cachedThreadPool.
713                                Future<?> future = dependencies.get(key);
714                                if (future != null) {
715                                        try {
716                                                if (this.timeoutMillis > 0 && remainingTimeout > 0) {
717                                                        // Don't hang forever if we've specified a timeout on this worker
718                                                        this.dependencyOutput.put(key, future.get(remainingTimeout, TimeUnit.MILLISECONDS));
719                                                } else {
720                                                        // Hangs forever (or until interrupted)
721                                                        this.dependencyOutput.put(key, future.get());
722                                                }
723                                        } catch(InterruptedException e) {
724                                                throw e;
725                                        } catch(Exception e) {
726                                                // This will almost certainly be an execution exception
727                                                logger.error("Observed dependency task " + key + " had an exception", e);
728                                                dependencyFailure = true;
729                                        }
730                                }
731                        }
732                }
733                if (dependencyFailure) {
734                        throw new GeneralException("One or more dependencies of worker " + getWorkerName() + " failed. See the logs for details.");
735                }
736        }
737
738}