001package com.identityworksllc.iiq.common.threads; 002 003import com.identityworksllc.iiq.common.TaskCallback; 004import com.identityworksllc.iiq.common.Utilities; 005import com.identityworksllc.iiq.common.vo.Outcome; 006import org.apache.commons.logging.Log; 007import org.apache.commons.logging.LogFactory; 008import sailpoint.api.SailPointContext; 009import sailpoint.object.Request; 010import sailpoint.object.RequestDefinition; 011import sailpoint.task.TaskMonitor; 012import sailpoint.tools.Compressor; 013import sailpoint.tools.GeneralException; 014import sailpoint.tools.Util; 015 016import java.io.IOException; 017import java.io.Serializable; 018import java.util.ArrayList; 019import java.util.Base64; 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.StringJoiner; 025import java.util.UUID; 026import java.util.concurrent.Callable; 027import java.util.concurrent.CancellationException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.FutureTask; 031import java.util.concurrent.RecursiveTask; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicReference; 036 037/** 038 * A worker thread for multi-threaded actions. This class can be used with 039 * AbstractThreadedObjectIteratorTask to implement a multi-threaded task 040 * or with SailPointWorkerExecutor to distribute the workers across 041 * the cluster. 042 * 043 * If intended for use with a Request, all parts of the subclass, including 044 * objects stored in lists, maps, and other structures, must be either Serializable 045 * or Externalizable. 046 */ 047public abstract class SailPointWorker implements Runnable, Serializable { 048 049 /** 050 * An interface used as an error callback, specifically for use via the SailPointWorkerExecutor 051 * but potentially usable by testing code as well. 052 */ 053 @FunctionalInterface 054 public interface ExceptionHandler { 055 void handleError(Throwable t); 056 } 057 058 /** 059 * The attribute used to pass this object in serialized form to {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}. 060 */ 061 public static final String MULTI_SERIALIZED_WORKERS_ATTR = "serializedWorkers"; 062 063 /** 064 * Generated serialVersionUID 065 */ 066 private static final long serialVersionUID = 3L; 067 068 /** 069 * Submits the task to the given executor and returns its future. If any listeners 070 * are passed, the given task's Future will be registered with the listeners as a 071 * dependency. The listeners will not be able to run until the Future resolves. 072 * 073 * @param executor The executor to submit the task to 074 * @param self The task to submit 075 * @param listeners Any other workers that care about the output of this one 076 * @return The Future for this task 077 */ 078 public static Future<?> submitWithListeners(ExecutorService executor, SailPointWorker self, SailPointWorker... listeners) { 079 Future<?> future = executor.submit(self.toCallable()); 080 081 for(SailPointWorker listener : listeners) { 082 listener.addDependency(self, future); 083 } 084 085 return future; 086 } 087 088 /** 089 * Serializes a list of SailPointWorker object into a Request suitable for use with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}. That executor must be associated with the provided RequestDefinition. 090 * @param requestDefinition The request definition associated with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor} 091 * @param workers A list of SailPointWorkers to pass to the request handler 092 * @return The Request containing a serialized object 093 * @throws GeneralException if any failures occur compressing the object 094 * @throws IOException if any failures occur serializing this {@link SailPointWorker} 095 */ 096 public static Request toRequest(RequestDefinition requestDefinition, List<SailPointWorker> workers) throws GeneralException, IOException { 097 Request request = new Request(); 098 request.setName("Batch of " + workers.size() + " workers"); 099 request.setDefinition(requestDefinition); 100 List<String> workerStrings = new ArrayList<>(); 101 int phase = 0; 102 int dependentPhase = 0; 103 for(SailPointWorker worker : workers) { 104 byte[] contents = Util.serializableObjectToBytes(worker); 105 String compressed = Compressor.compress(Base64.getEncoder().encodeToString(contents)); 106 workerStrings.add(compressed); 107 if (phase == 0 && worker.phase > 0) { 108 phase = worker.phase; 109 } 110 if (dependentPhase == 0 && worker.dependentPhase > 0) { 111 dependentPhase = worker.dependentPhase; 112 } 113 } 114 request.setAttribute(MULTI_SERIALIZED_WORKERS_ATTR, workerStrings); 115 if (phase > 0) { 116 request.setPhase(phase); 117 } 118 if (dependentPhase > 0) { 119 request.setDependentPhase(dependentPhase); 120 } 121 return request; 122 } 123 /** 124 * An optional list of Future objects that will be checked before the task is 125 * run. The resulting objects will be available via getDependencyOutput() 126 */ 127 /*package*/ List<SailPointWorker> children; 128 /** 129 * An optional counter object to increment on a successful execution 130 */ 131 private transient AtomicInteger completedCounter; 132 /** 133 * The default name of this task 134 */ 135 private final String defaultName; 136 /** 137 * A list of registered dependencies by name 138 */ 139 /*package*/ transient Map<String, Future<?>> dependencies; 140 141 /** 142 * An optional list of Future objects that will be checked before the task is 143 * run. The resulting objects will be available via getDependencyOutput() 144 */ 145 /*package*/ transient Map<String, Object> dependencyOutput; 146 147 /** 148 * The dependent for this worker, if this is used as a partition 149 */ 150 private int dependentPhase; 151 /** 152 * The optional exception handler for this task 153 */ 154 private transient ExceptionHandler exceptionHandler; 155 156 /** 157 * A flag indicating that we ought to execute the children prior to this one. 158 * This defaults to true, but will be altered by {@link RecursiveWorkerChildTask}. 159 */ 160 /*package*/ boolean executeChildren; 161 162 /** 163 * An optional counter object to increment on failures 164 */ 165 private transient AtomicInteger failedCounter; 166 /** 167 * An instance of the TaskMonitor interface 168 */ 169 protected transient TaskMonitor monitor; 170 171 /** 172 * An Outcome object allowing subclasses to provide output 173 */ 174 protected transient Outcome outcome; 175 176 /** 177 * The parent task of this one, used to propagate events 178 */ 179 private SailPointWorker parent; 180 181 /** 182 * The phase this worker is in, if used as a partition 183 */ 184 private int phase; 185 186 /** 187 * Task callback object to be invoked 188 */ 189 private List<TaskCallback<SailPointWorker, Object>> taskCallback; 190 191 /** 192 * A flag accessible to subclasses indicating that this task has been terminated 193 */ 194 private transient AtomicBoolean terminated; 195 196 /** 197 * The amount of time to be spent before timing out 198 */ 199 private long timeoutMillis; 200 201 /** 202 * The timestamp at which to mark this task as timed out 203 */ 204 private long timeoutTimestamp; 205 206 /** 207 * Constructs a new Worker with the default name (the class + UUID) 208 */ 209 protected SailPointWorker() { 210 this(null); 211 } 212 213 /** 214 * A worker constructor that takes a name 215 * @param name The worker name 216 */ 217 protected SailPointWorker(String name) { 218 outcome = new Outcome(); 219 terminated = new AtomicBoolean(); 220 if (Util.isNotNullOrEmpty(name)) { 221 this.defaultName = name; 222 } else { 223 this.defaultName = this.getClass().getSimpleName() + " " + UUID.randomUUID(); 224 } 225 children = new ArrayList<>(); 226 dependencyOutput = new HashMap<>(); 227 dependencies = new HashMap<>(); 228 taskCallback = new ArrayList<>(); 229 this.executeChildren = true; 230 } 231 232 /** 233 * A worker constructor that takes a name 234 * @param name The worker name 235 * @param phase The worker phase 236 */ 237 protected SailPointWorker(String name, int phase) { 238 this(name); 239 this.phase = phase; 240 } 241 242 /** 243 * A worker constructor that takes a name 244 * @param name The worker name 245 * @param phase The worker phase 246 */ 247 protected SailPointWorker(String name, int phase, int dependentPhase) { 248 this(name, phase); 249 this.dependentPhase = dependentPhase; 250 } 251 252 /** 253 * Adds a child of this task. All child tasks will be resolved before this task runs 254 * and their output will be available to this one. This could be used to implement 255 * phased execution, for example. 256 * 257 * This is NOT the same as a dependency, which is an asynchronous Future that will 258 * block until completion. 259 * 260 * @param childWorker The worker to add as a child 261 */ 262 public void addChild(SailPointWorker childWorker) { 263 childWorker.setParent(this); 264 this.children.add(childWorker); 265 } 266 267 /** 268 * Adds a dependency Future which will be invoked prior to any child tasks 269 * and also this task. 270 * 271 * @param dependency The dependency task (used to extract the name) 272 * @param workerFuture The future for that dependency 273 */ 274 public void addDependency(SailPointWorker dependency, Future<?> workerFuture) { 275 this.dependencies.put(dependency.getWorkerName(), workerFuture); 276 } 277 278 /** 279 * Adds a task callback, which can be used to receive events 280 * @param taskCallback The task callback 281 */ 282 public void addTaskCallback(TaskCallback<SailPointWorker, Object> taskCallback) { 283 this.taskCallback.add(taskCallback); 284 } 285 286 /** 287 * Checks for any of the abnormal termination states, throwing an 288 * {@link CancellationException} if one is encountered. Subclasses that 289 * are working in a loop should invoke this method routinely and allow 290 * the exception to abort whatever loop is being run. 291 * 292 * @throws CancellationException if this thread should abnormally terminate 293 */ 294 protected void checkCancel() throws CancellationException { 295 if (isTerminated() || Thread.currentThread().isInterrupted()) { 296 throw new CancellationException("Worker has been terminated"); 297 } 298 } 299 300 /** 301 * Executes this task in a SailPoint context that will be dynamically constructed for 302 * it. A private context and {@link Log} will be passed to this method. 303 * @param context The private context to use for this thread worker 304 * @param logger The log attached to this Worker 305 * @return any object, which will be ignored 306 * @throws Exception any exception, which will be logged 307 */ 308 public abstract Object execute(SailPointContext context, Log logger) throws Exception; 309 310 /** 311 * Retrieve the output of a dependency (listened-to) or child task of this task. 312 * 313 * @param key The name of the child worker 314 * @return The output 315 */ 316 protected Object getDependencyOutput(String key) { 317 return this.dependencyOutput.get(key); 318 } 319 320 /** 321 * Returns the dependent phase of this worker 322 * @return The dependent phase 323 */ 324 public int getDependentPhase() { 325 return dependentPhase; 326 } 327 328 /** 329 * Gets the exception handler for this worker 330 * @return The exception handler for this worker 331 */ 332 public ExceptionHandler getExceptionHandler() { 333 return exceptionHandler; 334 } 335 336 /** 337 * Gets the monitor associated with this worker (if any) 338 * @return The monitor, which may be null 339 */ 340 public TaskMonitor getMonitor() { 341 return monitor; 342 } 343 344 /** 345 * Retrieves the parent task of this one 346 * @return The parent task 347 */ 348 public SailPointWorker getParent() { 349 return parent; 350 } 351 352 /** 353 * Returns the phase of this worker 354 * @return The phase of this worker 355 */ 356 public int getPhase() { 357 return phase; 358 } 359 360 /** 361 * Returns the name of this worker, which can be used in log messages or as the 362 * name of a partitioned request object. By default, this is generated from the 363 * name of this concrete class (your subclass) and a random UUID. 364 * 365 * Override this method to provide your own more sensible naming scheme. 366 * 367 * @return The worker name 368 */ 369 public String getWorkerName() { 370 return defaultName; 371 } 372 373 /** 374 * The core flow of a SailPointWorker. Renames the thread for logging 375 * clarity, resolves any dependencies, and handles success and failure 376 * outcomes by invoking the appropriate callbacks. 377 * 378 * @return The implementation 379 */ 380 private Object implementation(boolean rethrow) throws Exception { 381 // The non-exceptional output of this task 382 AtomicReference<Object> result = new AtomicReference<>(); 383 384 // The exceptional output of this task 385 AtomicReference<Exception> exceptionResult = new AtomicReference<>(); 386 387 Log logger = LogFactory.getLog(this.getClass()); 388 final String originalThreadName = Thread.currentThread().getName(); 389 this.timeoutTimestamp = System.currentTimeMillis() + timeoutMillis; 390 try { 391 if (taskCallback != null) { 392 // Can't do this in a lambda forEach because this one can throw an exception 393 for(TaskCallback<SailPointWorker, Object> tc : taskCallback) { 394 tc.beforeStart(this); 395 } 396 } 397 Thread.currentThread().setName("SailPointWorker: " + getWorkerName()); 398 try { 399 Utilities.withPrivateContext((threadContext) -> { 400 try { 401 waitForDependencies(logger); 402 if (this.executeChildren) { 403 // Executes the child tasks, storing their outputs 404 runChildren(this, logger, threadContext); 405 } 406 result.set(execute(threadContext, logger)); 407 threadContext.commitTransaction(); 408 if (completedCounter != null) { 409 completedCounter.incrementAndGet(); 410 } 411 if (taskCallback != null) { 412 taskCallback.forEach((tc) -> tc.onSuccess(this, result.get())); 413 } 414 } catch (Exception e) { 415 if (failedCounter != null) { 416 failedCounter.incrementAndGet(); 417 } 418 if (exceptionHandler != null) { 419 exceptionHandler.handleError(e); 420 } 421 if (taskCallback != null) { 422 taskCallback.forEach((tc) -> tc.onFailure(this, e.getMessage(), e)); 423 } 424 exceptionResult.set(e); 425 } 426 }); 427 } finally { 428 Thread.currentThread().setName(originalThreadName); 429 } 430 } finally { 431 if (taskCallback != null) { 432 taskCallback.forEach((tc) -> tc.afterFinish(this)); 433 } 434 if (this.parent != null && this.parent.dependencyOutput != null && this.dependencyOutput != null) { 435 this.parent.dependencyOutput.putAll(this.dependencyOutput); 436 } 437 } 438 439 if (rethrow && exceptionResult.get() != null) { 440 throw exceptionResult.get(); 441 } 442 443 return result.get(); 444 } 445 446 /** 447 * Returns true if this worker has been terminated or if it has timed out. 448 */ 449 public boolean isTerminated() { 450 return isTimedOut() || terminated.get(); 451 } 452 453 /** 454 * Returns true if this task has timed out. 455 * 456 * @return True if this task has timed out. 457 */ 458 public boolean isTimedOut() { 459 return (this.timeoutMillis > 0 && System.currentTimeMillis() < timeoutTimestamp); 460 } 461 462 /** 463 * Java deserialization hook to instantiate the transient fields 464 * 465 * @see Serializable 466 * 467 * @param in The input stream 468 * @throws IOException if anything fails 469 * @throws ClassNotFoundException if this is the wrong clas 470 */ 471 private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { 472 this.terminated = new AtomicBoolean(); 473 this.outcome = new Outcome(); 474 this.dependencyOutput = new HashMap<>(); 475 this.dependencies = new HashMap<>(); 476 in.defaultReadObject(); 477 } 478 479 /** 480 * Invokes this SailPointWorker by constructing a new private SailPointContext, 481 * then invoking the subclass's {@link #execute(SailPointContext, Log)} method, 482 * then handling success or failure according to the optional objects. 483 * 484 * The subclass is responsible for logging and rethrowing any exceptions if 485 * a failure is to be counted. 486 */ 487 @Override 488 public void run() { 489 try { 490 implementation(false); 491 } catch(Exception ignored) { 492 // Already handled due to 'false' above 493 } 494 } 495 496 /** 497 * Directly runs the child tasks for this task. If any child task throws an exception, 498 * or turns out to have timed out upon completion, an exception will be thrown from 499 * this method and no further child tasks will be executed. 500 * 501 * Each child's {@link #execute(SailPointContext, Log)} method is invoked directly. 502 * Children are run in the same SailPointContext as the parent. 503 * 504 * A tree of nested child tasks will be invoked in depth-first order. 505 * 506 * @param logger The logger 507 * @param context The private context created in {@link #implementation(boolean)} 508 * @throws Exception If a child throws an exception 509 */ 510 private void runChildren(SailPointWorker parent, Log logger, SailPointContext context) throws Exception { 511 for(SailPointWorker child : Util.safeIterable(parent.children)) { 512 child.timeoutTimestamp = System.currentTimeMillis() + child.timeoutMillis; 513 514 if (!Util.isEmpty(child.children)) { 515 runChildren(child, logger, context); 516 } 517 Object output = child.execute(context, logger); 518 if (child.isTimedOut()) { 519 throw new CancellationException("Child task " + child.getWorkerName() + " timed out"); 520 } 521 parent.dependencyOutput.put(child.getWorkerName(), output); 522 } 523 } 524 525 /** 526 * Gets this object as a Runnable, mainly for use with an ExecutorService 527 * @return This object as a Runnable 528 */ 529 public final Runnable runnable() { 530 return this; 531 } 532 533 /** 534 * Set the completed counter on this task and any of its children 535 * @param completedCounter The completed counter 536 */ 537 public void setCompletedCounter(AtomicInteger completedCounter) { 538 this.completedCounter = completedCounter; 539 540 if (this.children != null) { 541 for(SailPointWorker child : this.children) { 542 child.setCompletedCounter(completedCounter); 543 } 544 } 545 } 546 547 /** 548 * Set the dependent phase for this worker. This is only useful if running 549 * the worker as a partitioned Request. 550 * 551 * @param dependentPhase The completed counter 552 */ 553 public void setDependentPhase(int dependentPhase) { 554 this.dependentPhase = dependentPhase; 555 556 // NOTE: Children don't need to copy the parent's phase because they run in the 557 // same thread as this one and thus will be in the same phase by default. 558 } 559 560 /** 561 * Sets the exception handler for this class. 562 * @param exceptionHandler The exception handler 563 */ 564 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 565 this.exceptionHandler = exceptionHandler; 566 567 if (this.children != null) { 568 for(SailPointWorker child : this.children) { 569 child.setExceptionHandler(exceptionHandler); 570 } 571 } 572 } 573 574 /** 575 * Sets the failure counter that will be incremented on any worker failure. 576 * 577 * @param failedCounter The failed counter 578 */ 579 public void setFailedCounter(AtomicInteger failedCounter) { 580 this.failedCounter = failedCounter; 581 582 if (this.children != null) { 583 for(SailPointWorker child : this.children) { 584 child.setFailedCounter(failedCounter); 585 } 586 } 587 } 588 589 /** 590 * Sets the TaskMonitor for this worker and its children. This will be set by 591 * the SPW request executor, among other places. 592 * @param monitor The task monitor 593 */ 594 public void setMonitor(TaskMonitor monitor) { 595 this.monitor = monitor; 596 597 if (this.children != null) { 598 for(SailPointWorker child : this.children) { 599 child.setMonitor(monitor); 600 } 601 } 602 } 603 604 /** 605 * Sets the parent task of this one to the given value. This can be null 606 * @param parent The parent task 607 */ 608 protected void setParent(SailPointWorker parent) { 609 this.parent = parent; 610 } 611 612 /** 613 * Sets the timeout duration in the specified unit. 614 */ 615 public void setTimeout(int duration, TimeUnit unit) { 616 this.timeoutMillis = TimeUnit.MILLISECONDS.convert(duration, unit); 617 } 618 619 /** 620 * Sets the phase for this worker. This is only useful if running as a {@link Request}. 621 * @param phase The phase number for this worker 622 */ 623 public void setPhase(int phase) { 624 this.phase = phase; 625 626 // NOTE: Children don't need to copy the parent's phase because they run in the 627 // same thread as this one and thus will be in the same phase by default. 628 } 629 630 /** 631 * Attempts to terminate the worker 632 * 633 * @return True, only to satisfy the Terminable interface 634 */ 635 public boolean terminate() { 636 this.terminated.set(true); 637 638 return true; 639 } 640 641 /** 642 * Returns a Callable object that will implement the logic of this SailPointWorker, 643 * properly returning a value or an exception for {@link java.util.concurrent.Future} purposes. 644 * 645 * This is used mainly because {@link java.util.concurrent.ExecutorService#submit(Runnable)} 646 * gets messy if the same object implements both Runnable and Callable. This must also be 647 * used if you want to chain workers using {@link java.util.concurrent.Future} and the 648 * dependency function. 649 */ 650 public Callable<Object> toCallable() { 651 return () -> this.implementation(true); 652 } 653 654 /** 655 * Creates a new recursive task from this worker, for use with a ForkJoinPool 656 * and a tree of child worker tasks. 657 * @return The recursive task 658 */ 659 public RecursiveTask<List<Object>> toForkJoinTask() { 660 return new RecursiveWorkerContainer(Collections.singletonList(this)); 661 } 662 663 /** 664 * Creates a FutureTask wrapping this object as a Callable 665 * @return The FutureTask 666 */ 667 public FutureTask<Object> toFutureTask() { 668 return new FutureTask<>(toCallable()); 669 } 670 671 /** 672 * Serializes this SailPointWorker object into a Request suitable for use with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor}. That executor must be associated with the provided RequestDefinition. 673 * @param requestDefinition The request definition associated with {@link com.identityworksllc.iiq.common.request.SailPointWorkerExecutor} 674 * @return The Request containing a serialized object 675 * @throws GeneralException if any failures occur compressing the object 676 * @throws IOException if any failures occur serializing this {@link SailPointWorker} 677 */ 678 public Request toRequest(RequestDefinition requestDefinition) throws GeneralException, IOException { 679 List<SailPointWorker> singletonList = new ArrayList<>(); 680 singletonList.add(this); 681 682 return SailPointWorker.toRequest(requestDefinition, singletonList); 683 } 684 685 /** 686 * @see Object#toString() 687 */ 688 @Override 689 public String toString() { 690 return new StringJoiner(", ", SailPointWorker.class.getSimpleName() + "[", "]") 691 .add("name='" + getWorkerName() + "'") 692 .add("dependencies='" + this.dependencies.keySet() + "'") 693 .add("children='" + this.children + "'") 694 .toString(); 695 } 696 697 /** 698 * Resolves any dependencies of this task, noting errors 699 * @param logger The logger used to log dependency errors 700 * @throws GeneralException if any dependency fails 701 */ 702 private void waitForDependencies(Log logger) throws GeneralException, InterruptedException { 703 boolean dependencyFailure = false; 704 if (dependencies != null) { 705 for (String key : dependencies.keySet()) { 706 long remainingTimeout = this.timeoutTimestamp - System.currentTimeMillis(); 707 if (isTimedOut()) { 708 throw new InterruptedException(); 709 } 710 711 // Note that this will block until the Future resolves. It will consume 712 // a thread in your thread pool, so you should probably use a cachedThreadPool. 713 Future<?> future = dependencies.get(key); 714 if (future != null) { 715 try { 716 if (this.timeoutMillis > 0 && remainingTimeout > 0) { 717 // Don't hang forever if we've specified a timeout on this worker 718 this.dependencyOutput.put(key, future.get(remainingTimeout, TimeUnit.MILLISECONDS)); 719 } else { 720 // Hangs forever (or until interrupted) 721 this.dependencyOutput.put(key, future.get()); 722 } 723 } catch(InterruptedException e) { 724 throw e; 725 } catch(Exception e) { 726 // This will almost certainly be an execution exception 727 logger.error("Observed dependency task " + key + " had an exception", e); 728 dependencyFailure = true; 729 } 730 } 731 } 732 } 733 if (dependencyFailure) { 734 throw new GeneralException("One or more dependencies of worker " + getWorkerName() + " failed. See the logs for details."); 735 } 736 } 737 738}