Class AbstractThreadedTask<T>

  • Type Parameters:
    T - The type of input object that will be passed to threadExecute.
    All Implemented Interfaces:
    PrivateContextObjectConsumer<T>, sailpoint.object.BaseExecutor, sailpoint.object.TaskExecutor
    Direct Known Subclasses:
    AbstractThreadedObjectIteratorTask

    public abstract class AbstractThreadedTask<T>
    extends sailpoint.task.AbstractTaskExecutor
    implements PrivateContextObjectConsumer<T>
    An abstract superclass for nearly all custom multi-threaded SailPoint tasks.

    This task will retrieve a list of objects and then pass each of them to a processor method in the subclass in parallel.

    The overall flow is:

    1. Invoke parseArgs(Attributes) to extract task arguments. Subclasses should override this to retrieve their own parameters.
    2. Invoke getObjectIterator(SailPointContext, Attributes) to retrieve a list of items to iterate over. This iterator can be “streaming” or not.
    3. Invoke submitAndWait(SailPointContext, TaskResult, Iterator) to begin processing of the list of items. This can be replaced by a subclass, but the default flow is described below. It is unlikely that you will need to change it.
    4. Clean up the thread pool and update the TaskResult with outcomes.

    Submit-and-wait proceeds as follows:

    1. Retrieve the batch size from getBatchSize()
    2. Create a thread pool with the specified number of threads in it.
    3. For each item, invoke the subclass’s #threadExecute(SailPointContext, Map, T) method, passing the current item in the third parameter. If a batch size is set, more than one item will be passed in a single worker thread, eliminating the need to build and destroy lots of private contexts. This will likely be more efficient for large operations.

    Via the SailPointWorker class, the #threadExecute(SailPointContext, Map, T) method will also receive an appropriately thread-specific SailPointContext object that can be used without worrying about transaction length or overlap.

    Subclasses can override most parts of this process by extending the various protected methods.

    Subclasses cannot direct receive a termination notification, but can register any number of termination handlers by invoking addTerminationHandler(Functions.GenericCallback). This class makes a best effort to invoke all termination handlers.

    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected BiConsumer<Thread,​Object> beforeExecutionHook
      If present, this BiConsumer can be invoked before execution of each object.
      protected sailpoint.api.SailPointContext context
      The parent SailPoint context
      protected ExecutorService executor
      The thread pool
      protected AtomicInteger failureCounter
      The counter of how many threads have indicated failures
      protected org.apache.commons.logging.Log log
      The log object
      protected AtomicInteger successCounter
      The counter of how many threads have indicated success
      protected sailpoint.object.TaskResult taskResult
      The TaskResult to keep updated with changes
      protected sailpoint.object.TaskSchedule taskSchedule
      The TaskSchedule, which can be used in querying
      protected AtomicBoolean terminated
      The boolean flag indicating that this task has been terminated
      protected int threadCount
      How many threads are to be created
      • Fields inherited from class sailpoint.task.AbstractTaskExecutor

        ARG_FILTER, ARG_LOCK_MODE, ARG_LOCK_TIMEOUT, ARG_PROFILE, ARG_RESTARTABLE, ARG_TRACE, DEFAULT_LOCK_TIMEOUT, RET_TOTAL
    • Method Summary

      All Methods Instance Methods Abstract Methods Concrete Methods 
      Modifier and Type Method Description
      protected void addTerminationHandler​(Functions.GenericCallback handler)
      Adds a termination handler to this execution of the task
      void afterBatch​(sailpoint.api.SailPointContext context)
      Invoked by the default worker thread after each batch is completed.
      void beforeBatch​(sailpoint.api.SailPointContext context)
      Invoked by the default worker thread before each batch is begun.
      protected Iterator<List<T>> createBatchIterator​(Iterator<? extends T> items, int batchSize)
      Retrieves an iterator over batches of items, with the size suggested by the second parameter.
      void execute​(sailpoint.api.SailPointContext ctx, sailpoint.object.TaskSchedule ts, sailpoint.object.TaskResult tr, sailpoint.object.Attributes<String,​Object> args)
      The main method of this task executor, which invokes the appropriate hook methods.
      int getBatchSize()
      Gets the batch size for this task.
      ExecutorService getExecutor()
      Gets the running executor for this task
      protected abstract Iterator<? extends T> getObjectIterator​(sailpoint.api.SailPointContext context, sailpoint.object.Attributes<String,​Object> args)
      Retrieves an Iterator that will produce the stream of objects to be processed in parallel.
      protected void markFailure​(T item)
      Marks this item as a failure by incrementing the failure counter.
      protected void markSuccess​(T item)
      Marks this item as a success by incrementing the success counter.
      protected void parseArgs​(sailpoint.object.Attributes<String,​Object> args)
      Extracts the thread count from the task arguments.
      protected void prepareExecutor()
      Prepares the thread pool executor.
      void setBeforeExecutionHook​(BiConsumer<Thread,​Object> beforeExecutionHook)
      Sets the “before execution hook”, an optional pluggable callback that will be invoked prior to the execution of each thread.
      void setFailureMarker​(Consumer<T> failureMarker)
      Sets the failure marking callback
      void setSuccessMarker​(Consumer<T> successMarker)
      Sets the success marking callback
      void setWorkerCreator​(ThreadWorkerCreator<T> workerCreator)
      Sets the worker creator function.
      protected void submitAndWait​(sailpoint.api.SailPointContext context, sailpoint.object.TaskResult taskResult, Iterator<? extends T> items)
      Submits the iterator of items to the thread pool, calling threadExecute for each one, then waits for all of the threads to complete or the task to be terminated.
      boolean terminate()
      Terminates the task by setting the terminated flag, interrupting the executor, waiting five seconds for it to exit, then invoking any shutdown hooks
      abstract Object threadExecute​(sailpoint.api.SailPointContext threadContext, Map<String,​Object> parameters, T obj)
      This method will be called in parallel for each item produced by getObjectIterator(SailPointContext, Attributes).
      • Methods inherited from class sailpoint.task.AbstractTaskExecutor

        getMonitor, getSuggestedPartitionCount, getSuggestedPartitionCount, launchPartitions, processCommand, saveRequest, setMonitor, updateProgress, updateProgress
    • Field Detail

      • beforeExecutionHook

        protected BiConsumer<Thread,​Object> beforeExecutionHook
        If present, this BiConsumer can be invoked before execution of each object.

        The subclass is responsible for making this call. This is mainly useful as a testing hook.

      • context

        protected sailpoint.api.SailPointContext context
        The parent SailPoint context
      • log

        protected org.apache.commons.logging.Log log
        The log object
      • taskResult

        protected sailpoint.object.TaskResult taskResult
        The TaskResult to keep updated with changes
      • taskSchedule

        protected sailpoint.object.TaskSchedule taskSchedule
        The TaskSchedule, which can be used in querying
      • terminated

        protected final AtomicBoolean terminated
        The boolean flag indicating that this task has been terminated
      • threadCount

        protected int threadCount
        How many threads are to be created
    • Method Detail

      • addTerminationHandler

        protected final void addTerminationHandler​(Functions.GenericCallback handler)
        Adds a termination handler to this execution of the task
        Parameters:
        handler - The termination handler to run on completion
      • afterBatch

        public void afterBatch​(sailpoint.api.SailPointContext context)
                        throws sailpoint.tools.GeneralException
        Invoked by the default worker thread after each batch is completed.

        This can be overridden by a subclass to do arbitrary cleanup.

        Parameters:
        context - The context for this thread
        Throws:
        sailpoint.tools.GeneralException - if anything goes wrong
      • beforeBatch

        public void beforeBatch​(sailpoint.api.SailPointContext context)
                         throws sailpoint.tools.GeneralException
        Invoked by the default worker thread before each batch is begun.

        If this method throws an exception, the batch worker ought to prevent the batch from being executed.

        Parameters:
        context - The context for this thread
        Throws:
        sailpoint.tools.GeneralException - if any failures occur
      • execute

        public final void execute​(sailpoint.api.SailPointContext ctx,
                                  sailpoint.object.TaskSchedule ts,
                                  sailpoint.object.TaskResult tr,
                                  sailpoint.object.Attributes<String,​Object> args)
                           throws Exception
        The main method of this task executor, which invokes the appropriate hook methods.
        Specified by:
        execute in interface sailpoint.object.TaskExecutor
        Throws:
        Exception
      • createBatchIterator

        protected Iterator<List<T>> createBatchIterator​(Iterator<? extends T> items,
                                                        int batchSize)
        Retrieves an iterator over batches of items, with the size suggested by the second parameter.

        If left unmodified, returns either a BatchingIterator when the batch size is greater than 1, or a TransformingIterator that constructs a singleton list for each item when batch size is 1.

        If possible, the returned Iterator should simply wrap the input, rather than consuming it. This allows for “live” iterators that read from a data source directly rather than pre-reading. However, beware Hibernate iterators here because a ‘commit’ can kill those mid-iterate.

        Parameters:
        items - The input iterator of items
        batchSize - The batch size
        Returns:
        The iterator over a list of items
      • getBatchSize

        public int getBatchSize()
        Gets the batch size for this task.

        By default, this is the batch size passed as an input to the task, but this may be overridden by subclasses.

        Returns:
        The batch size for each thread
      • getObjectIterator

        protected abstract Iterator<? extends TgetObjectIterator​(sailpoint.api.SailPointContext context,
                                                                   sailpoint.object.Attributes<String,​Object> args)
                                                            throws sailpoint.tools.GeneralException
        Retrieves an Iterator that will produce the stream of objects to be processed in parallel.

        Each object produced by this Iterator will be passed in its turn to threadExecute(SailPointContext, Map, Object) as the third parameter.

        IMPORTANT NOTES RE: HIBERNATE:

        It may be unwise to return a “live” Hibernate iterator of the sort provided by context.search here. The next read of the iterator will fail with a “Result Set Closed” exception if anything commits this context while the iterator is still being consumed. It is likely that the first worker threads will execute before the iterator is fully read.

        If you return a SailPointObject or any other object dependent on a Hibernate context, you will likely receive context-related errors in your worker thread unless you make an effort to re-attach the object to the thread context.

        TODO One option may be to pass in a private context here, but it couldn’t be closed until after iteration is complete.

        Parameters:
        context - The top-level task Sailpoint context
        args - The task arguments
        Returns:
        An iterator containing the objects to be iterated over
        Throws:
        sailpoint.tools.GeneralException - if any failures occur
      • markFailure

        protected void markFailure​(T item)
        Marks this item as a failure by incrementing the failure counter.

        Subclasses may override this method to add additional behavior.

      • markSuccess

        protected void markSuccess​(T item)
        Marks this item as a success by incrementing the success counter.

        Subclasses may override this method to add additional behavior.

      • parseArgs

        protected void parseArgs​(sailpoint.object.Attributes<String,​Object> args)
                          throws Exception
        Extracts the thread count from the task arguments.

        Subclasses should override this method to extract their own arguments. You must either call super.parseArgs() in any subclass implementation of this method or set threadCount yourself.

        Parameters:
        args - The task arguments
        Throws:
        Exception - if any failures occur parsing arguments
      • prepareExecutor

        protected void prepareExecutor()
                                throws sailpoint.tools.GeneralException
        Prepares the thread pool executor.

        The default implementation simply constructs a fixed-size executor service, but subclasses may override this behavior with their own implementations.

        After this method is finished, the executor attribute should be set to an ExecutorService that can accept new inputs.

        Throws:
        sailpoint.tools.GeneralException - if any failures occur
      • setBeforeExecutionHook

        public final void setBeforeExecutionHook​(BiConsumer<Thread,​Object> beforeExecutionHook)
        Sets the “before execution hook”, an optional pluggable callback that will be invoked prior to the execution of each thread.

        This BiConsumer’s accept() method must be thread-safe as it will be invoked in parallel.

        Parameters:
        beforeExecutionHook - An optional BiConsumer callback hook
      • setFailureMarker

        public final void setFailureMarker​(Consumer<T> failureMarker)
        Sets the failure marking callback
        Parameters:
        failureMarker - The callback invoked on item failure
      • setSuccessMarker

        public final void setSuccessMarker​(Consumer<T> successMarker)
        Sets the success marking callback
        Parameters:
        successMarker - The callback invoked on item failure
      • setWorkerCreator

        public final void setWorkerCreator​(ThreadWorkerCreator<T> workerCreator)
        Sets the worker creator function.

        This function should return a SailPointWorker extension that will take the given List of typed items and process them when its thread is invoked.

        Parameters:
        workerCreator - The worker creator function
      • submitAndWait

        protected void submitAndWait​(sailpoint.api.SailPointContext context,
                                     sailpoint.object.TaskResult taskResult,
                                     Iterator<? extends T> items)
                              throws sailpoint.tools.GeneralException
        Submits the iterator of items to the thread pool, calling threadExecute for each one, then waits for all of the threads to complete or the task to be terminated.
        Parameters:
        context - The SailPoint context
        taskResult - The taskResult to update (for monitoring)
        items - The iterator over items being processed
        Throws:
        sailpoint.tools.GeneralException - if any failures occur
      • terminate

        public final boolean terminate()
        Terminates the task by setting the terminated flag, interrupting the executor, waiting five seconds for it to exit, then invoking any shutdown hooks
        Specified by:
        terminate in interface sailpoint.object.BaseExecutor
        Returns:
        Always true
      • threadExecute

        public abstract Object threadExecute​(sailpoint.api.SailPointContext threadContext,
                                             Map<String,​Object> parameters,
                                             T obj)
                                      throws sailpoint.tools.GeneralException
        This method will be called in parallel for each item produced by getObjectIterator(SailPointContext, Attributes).

        DO NOT use the parent context in this method. You will encounter Weird Database Issues.

        Specified by:
        threadExecute in interface PrivateContextObjectConsumer<T>
        Parameters:
        threadContext - A private IIQ context for the current JVM thread
        parameters - A set of default parameters suitable for a Rule or Script. In the default implementation, the object will be in this map as ‘object’.
        obj - The object to process
        Returns:
        An arbitrary value (ignored by default)
        Throws:
        sailpoint.tools.GeneralException - if any failures occur