Class AbstractThreadedTask<T>
- java.lang.Object
-
- sailpoint.task.AbstractTaskExecutor
-
- com.identityworksllc.iiq.common.task.AbstractThreadedTask<T>
-
- Type Parameters:
T
- The type of input object that will be passed to threadExecute.
- All Implemented Interfaces:
PrivateContextObjectConsumer<T>
,sailpoint.object.BaseExecutor
,sailpoint.object.TaskExecutor
- Direct Known Subclasses:
AbstractThreadedObjectIteratorTask
,RoleTargetCleanupService
public abstract class AbstractThreadedTask<T> extends sailpoint.task.AbstractTaskExecutor implements PrivateContextObjectConsumer<T>
An abstract superclass for nearly all custom multi-threaded SailPoint tasks.This task will retrieve a list of objects and then pass each of them to a processor method in the subclass in parallel.
The overall flow is:
- Invoke
parseArgs(Attributes)
to extract task arguments. Subclasses should override this to retrieve their own parameters. - Invoke
getObjectIterator(SailPointContext, Attributes)
to retrieve a list of items to iterate over. This iterator can be “streaming” or not. - Invoke
submitAndWait(SailPointContext, TaskResult, Iterator)
to begin processing of the list of items. This can be replaced by a subclass, but the default flow is described below. It is unlikely that you will need to change it. - Clean up the thread pool and update the TaskResult with outcomes.
Submit-and-wait proceeds as follows:
- Retrieve the batch size from
getBatchSize()
- Create a thread pool with the specified number of threads in it.
- For each item, invoke the subclass’s
#threadExecute(SailPointContext, Map, T)
method, passing the current item in the third parameter. If a batch size is set, more than one item will be passed in a single worker thread, eliminating the need to build and destroy lots of private contexts. This will likely be more efficient for large operations.
Via the
SailPointWorker
class, the#threadExecute(SailPointContext, Map, T)
method will also receive an appropriately thread-specific SailPointContext object that can be used without worrying about transaction length or overlap.Subclasses can override most parts of this process by extending the various protected methods.
Subclasses cannot direct receive a termination notification, but can register any number of termination handlers by invoking
addTerminationHandler(Functions.GenericCallback)
. This class makes a best effort to invoke all termination handlers.
-
-
Field Summary
Fields Modifier and Type Field Description protected BiConsumer<Thread,Object>
beforeExecutionHook
If present, this BiConsumer can be invoked before execution of each object.protected sailpoint.api.SailPointContext
context
The parent SailPoint contextprotected ExecutorService
executor
The thread poolprotected AtomicInteger
failureCounter
The counter of how many threads have indicated failuresprotected org.apache.commons.logging.Log
log
The log objectprotected AtomicInteger
successCounter
The counter of how many threads have indicated successprotected sailpoint.object.TaskResult
taskResult
The TaskResult to keep updated with changesprotected sailpoint.object.TaskSchedule
taskSchedule
The TaskSchedule, which can be used in queryingprotected AtomicBoolean
terminated
The boolean flag indicating that this task has been terminatedprotected int
threadCount
How many threads are to be created
-
Constructor Summary
Constructors Constructor Description AbstractThreadedTask()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
addTerminationHandler(Functions.GenericCallback handler)
Adds a termination handler to this execution of the taskvoid
afterBatch(sailpoint.api.SailPointContext context)
Invoked by the default worker thread after each batch is completed.void
beforeBatch(sailpoint.api.SailPointContext context)
Invoked by the default worker thread before each batch is begun.protected Iterator<List<T>>
createBatchIterator(Iterator<? extends T> items, int batchSize)
Retrieves an iterator over batches of items, with the size suggested by the second parameter.void
execute(sailpoint.api.SailPointContext ctx, sailpoint.object.TaskSchedule ts, sailpoint.object.TaskResult tr, sailpoint.object.Attributes<String,Object> args)
The main method of this task executor, which invokes the appropriate hook methods.int
getBatchSize()
Gets the batch size for this task.ExecutorService
getExecutor()
Gets the running executor for this taskprotected abstract Iterator<? extends T>
getObjectIterator(sailpoint.api.SailPointContext context, sailpoint.object.Attributes<String,Object> args)
Retrieves an Iterator that will produce the stream of objects to be processed in parallel.protected void
markFailure(T item)
Marks this item as a failure by incrementing the failure counter.protected void
markSuccess(T item)
Marks this item as a success by incrementing the success counter.protected void
parseArgs(sailpoint.object.Attributes<String,Object> args)
Extracts the thread count from the task arguments.protected void
prepareExecutor()
Prepares the thread pool executor.void
setBeforeExecutionHook(BiConsumer<Thread,Object> beforeExecutionHook)
Sets the “before execution hook”, an optional pluggable callback that will be invoked prior to the execution of each thread.void
setFailureMarker(Consumer<T> failureMarker)
Sets the failure marking callbackvoid
setSuccessMarker(Consumer<T> successMarker)
Sets the success marking callbackvoid
setWorkerCreator(ThreadWorkerCreator<T> workerCreator)
Sets the worker creator function.protected void
submitAndWait(sailpoint.api.SailPointContext context, sailpoint.object.TaskResult taskResult, Iterator<? extends T> items)
Submits the iterator of items to the thread pool, calling threadExecute for each one, then waits for all of the threads to complete or the task to be terminated.boolean
terminate()
Terminates the task by setting the terminated flag, interrupting the executor, waiting five seconds for it to exit, then invoking any shutdown hooksabstract Object
threadExecute(sailpoint.api.SailPointContext threadContext, Map<String,Object> parameters, T obj)
This method will be called in parallel for each item produced bygetObjectIterator(SailPointContext, Attributes)
.
-
-
-
Field Detail
-
beforeExecutionHook
protected BiConsumer<Thread,Object> beforeExecutionHook
If present, this BiConsumer can be invoked before execution of each object.The subclass is responsible for making this call. This is mainly useful as a testing hook.
-
context
protected sailpoint.api.SailPointContext context
The parent SailPoint context
-
executor
protected ExecutorService executor
The thread pool
-
failureCounter
protected AtomicInteger failureCounter
The counter of how many threads have indicated failures
-
log
protected org.apache.commons.logging.Log log
The log object
-
successCounter
protected AtomicInteger successCounter
The counter of how many threads have indicated success
-
taskResult
protected sailpoint.object.TaskResult taskResult
The TaskResult to keep updated with changes
-
taskSchedule
protected sailpoint.object.TaskSchedule taskSchedule
The TaskSchedule, which can be used in querying
-
terminated
protected final AtomicBoolean terminated
The boolean flag indicating that this task has been terminated
-
threadCount
protected int threadCount
How many threads are to be created
-
-
Constructor Detail
-
AbstractThreadedTask
public AbstractThreadedTask()
-
-
Method Detail
-
addTerminationHandler
protected final void addTerminationHandler(Functions.GenericCallback handler)
Adds a termination handler to this execution of the task- Parameters:
handler
- The termination handler to run on completion
-
afterBatch
public void afterBatch(sailpoint.api.SailPointContext context) throws sailpoint.tools.GeneralException
Invoked by the default worker thread after each batch is completed.This can be overridden by a subclass to do arbitrary cleanup.
- Parameters:
context
- The context for this thread- Throws:
sailpoint.tools.GeneralException
- if anything goes wrong
-
beforeBatch
public void beforeBatch(sailpoint.api.SailPointContext context) throws sailpoint.tools.GeneralException
Invoked by the default worker thread before each batch is begun.If this method throws an exception, the batch worker ought to prevent the batch from being executed.
- Parameters:
context
- The context for this thread- Throws:
sailpoint.tools.GeneralException
- if any failures occur
-
execute
public final void execute(sailpoint.api.SailPointContext ctx, sailpoint.object.TaskSchedule ts, sailpoint.object.TaskResult tr, sailpoint.object.Attributes<String,Object> args) throws Exception
The main method of this task executor, which invokes the appropriate hook methods.- Specified by:
execute
in interfacesailpoint.object.TaskExecutor
- Throws:
Exception
-
createBatchIterator
protected Iterator<List<T>> createBatchIterator(Iterator<? extends T> items, int batchSize)
Retrieves an iterator over batches of items, with the size suggested by the second parameter.If left unmodified, returns either a
BatchingIterator
when the batch size is greater than 1, or aTransformingIterator
that constructs a singleton list for each item when batch size is 1.If possible, the returned Iterator should simply wrap the input, rather than consuming it. This allows for “live” iterators that read from a data source directly rather than pre-reading. However, beware Hibernate iterators here because a ‘commit’ can kill those mid-iterate.
- Parameters:
items
- The input iterator of itemsbatchSize
- The batch size- Returns:
- The iterator over a list of items
-
getBatchSize
public int getBatchSize()
Gets the batch size for this task.By default, this is the batch size passed as an input to the task, but this may be overridden by subclasses.
- Returns:
- The batch size for each thread
-
getExecutor
public final ExecutorService getExecutor()
Gets the running executor for this task- Returns:
- The executor
-
getObjectIterator
protected abstract Iterator<? extends T> getObjectIterator(sailpoint.api.SailPointContext context, sailpoint.object.Attributes<String,Object> args) throws sailpoint.tools.GeneralException
Retrieves an Iterator that will produce the stream of objects to be processed in parallel.Each object produced by this Iterator will be passed in its turn to
threadExecute(SailPointContext, Map, Object)
as the third parameter.IMPORTANT NOTES RE: HIBERNATE:
It may be unwise to return a “live” Hibernate iterator of the sort provided by context.search here. The next read of the iterator will fail with a “Result Set Closed” exception if anything commits this context while the iterator is still being consumed. It is likely that the first worker threads will execute before the iterator is fully read.
If you return a SailPointObject or any other object dependent on a Hibernate context, you will likely receive context-related errors in your worker thread unless you make an effort to re-attach the object to the thread context.
TODO One option may be to pass in a private context here, but it couldn’t be closed until after iteration is complete.
- Parameters:
context
- The top-level task Sailpoint contextargs
- The task arguments- Returns:
- An iterator containing the objects to be iterated over
- Throws:
sailpoint.tools.GeneralException
- if any failures occur
-
markFailure
protected void markFailure(T item)
Marks this item as a failure by incrementing the failure counter.Subclasses may override this method to add additional behavior.
-
markSuccess
protected void markSuccess(T item)
Marks this item as a success by incrementing the success counter.Subclasses may override this method to add additional behavior.
-
parseArgs
protected void parseArgs(sailpoint.object.Attributes<String,Object> args) throws Exception
Extracts the thread count from the task arguments.Subclasses should override this method to extract their own arguments. You must either call super.parseArgs() in any subclass implementation of this method or set
threadCount
yourself.- Parameters:
args
- The task arguments- Throws:
Exception
- if any failures occur parsing arguments
-
prepareExecutor
protected void prepareExecutor() throws sailpoint.tools.GeneralException
Prepares the thread pool executor.The default implementation simply constructs a fixed-size executor service, but subclasses may override this behavior with their own implementations.
After this method is finished, the
executor
attribute should be set to anExecutorService
that can accept new inputs.- Throws:
sailpoint.tools.GeneralException
- if any failures occur
-
setBeforeExecutionHook
public final void setBeforeExecutionHook(BiConsumer<Thread,Object> beforeExecutionHook)
Sets the “before execution hook”, an optional pluggable callback that will be invoked prior to the execution of each thread.This BiConsumer’s accept() method must be thread-safe as it will be invoked in parallel.
- Parameters:
beforeExecutionHook
- An optional BiConsumer callback hook
-
setFailureMarker
public final void setFailureMarker(Consumer<T> failureMarker)
Sets the failure marking callback- Parameters:
failureMarker
- The callback invoked on item failure
-
setSuccessMarker
public final void setSuccessMarker(Consumer<T> successMarker)
Sets the success marking callback- Parameters:
successMarker
- The callback invoked on item failure
-
setWorkerCreator
public final void setWorkerCreator(ThreadWorkerCreator<T> workerCreator)
Sets the worker creator function.This function should return a SailPointWorker extension that will take the given List of typed items and process them when its thread is invoked.
- Parameters:
workerCreator
- The worker creator function
-
submitAndWait
protected void submitAndWait(sailpoint.api.SailPointContext context, sailpoint.object.TaskResult taskResult, Iterator<? extends T> items) throws sailpoint.tools.GeneralException
Submits the iterator of items to the thread pool, calling threadExecute for each one, then waits for all of the threads to complete or the task to be terminated.- Parameters:
context
- The SailPoint contexttaskResult
- The taskResult to update (for monitoring)items
- The iterator over items being processed- Throws:
sailpoint.tools.GeneralException
- if any failures occur
-
terminate
public final boolean terminate()
Terminates the task by setting the terminated flag, interrupting the executor, waiting five seconds for it to exit, then invoking any shutdown hooks- Specified by:
terminate
in interfacesailpoint.object.BaseExecutor
- Returns:
- Always true
-
threadExecute
public abstract Object threadExecute(sailpoint.api.SailPointContext threadContext, Map<String,Object> parameters, T obj) throws sailpoint.tools.GeneralException
This method will be called in parallel for each item produced bygetObjectIterator(SailPointContext, Attributes)
.DO NOT use the parent context in this method. You will encounter Weird Database Issues.
- Specified by:
threadExecute
in interfacePrivateContextObjectConsumer<T>
- Parameters:
threadContext
- A private IIQ context for the current JVM threadparameters
- A set of default parameters suitable for a Rule or Script. In the default implementation, the object will be in this map as ‘object’.obj
- The object to process- Returns:
- An arbitrary value (ignored by default)
- Throws:
sailpoint.tools.GeneralException
- if any failures occur
-
-