001package com.identityworksllc.iiq.common.request; 002 003import com.identityworksllc.iiq.common.threads.SailPointWorker; 004import org.apache.commons.logging.Log; 005import org.apache.commons.logging.LogFactory; 006import sailpoint.api.SailPointContext; 007import sailpoint.object.Attributes; 008import sailpoint.object.Request; 009import sailpoint.object.TaskResult; 010import sailpoint.request.AbstractRequestExecutor; 011import sailpoint.request.RequestPermanentException; 012import sailpoint.task.TaskMonitor; 013import sailpoint.tools.Compressor; 014import sailpoint.tools.GeneralException; 015import sailpoint.tools.Util; 016 017import java.io.IOException; 018import java.util.Base64; 019import java.util.List; 020import java.util.concurrent.atomic.AtomicBoolean; 021import java.util.concurrent.atomic.AtomicInteger; 022 023/** 024 * Request executor to allow partitioning of workers across all nodes in the cluster. 025 * Your Request should include an attribute called serializedWorkers, which must be 026 * a list of Base64-encoded strings, each representing a Java Serialized worker. 027 * Each worker should encapsulate the work it's expected to do and must not depend 028 * on any inputs from the Request. 029 * 030 * (In other words, the worker should be able to do the same job whether invoked via 031 * a Request or directly in-line in your Java code.) 032 * 033 * Workers will be deserialized using ObjectInputStream. 034 * 035 * Workers will run in a single Sailpoint context and will be invoked via execute() 036 * and not via implementation(). 037 * 038 * Each worker will be provided a TaskMonitor for this Request, which it can use to 039 * execute partition status updates. 040 * 041 * I recommend creating a number of partitions with relatively small worker counts, 042 * because every update of the Request will need to serialize the entire object, 043 * including your strings, back to the database. 044 * 045 * An interrupted SailPointWorkerExecutor (e.g., via server shutdown) will be 046 * deleted rather than resumed when the server is restarted. You will have no 047 * notification of this. 048 */ 049public final class SailPointWorkerExecutor extends AbstractRequestExecutor { 050 /** 051 * The request definition name. You will need to have imported this request 052 * definition XML. 053 */ 054 public static final String REQUEST_DEFINITION = "IDW Worker Executor"; 055 056 private final Log log; 057 058 /** 059 * The currently running worker, used to handle termination gracefully 060 */ 061 private SailPointWorker runningWorker; 062 063 /** 064 * Will be set to true when terminate() is called 065 */ 066 private final AtomicBoolean terminated; 067 068 public SailPointWorkerExecutor() { 069 this.terminated = new AtomicBoolean(); 070 this.log = LogFactory.getLog(this.getClass()); 071 } 072 073 /** 074 * Main entry point 075 * @see AbstractRequestExecutor#execute(SailPointContext, Request, Attributes) 076 */ 077 @Override 078 public void execute(SailPointContext sailPointContext, Request request, Attributes<String, Object> attributes) throws RequestPermanentException { 079 if (log.isDebugEnabled()) { 080 log.debug("Beginning request " + request.getName()); 081 } 082 runWorkers(sailPointContext, request); 083 if (log.isDebugEnabled()) { 084 log.debug("Finished request " + request.getName()); 085 } 086 } 087 088 /** 089 * Executes the given list of workers serially 090 * 091 * @param sailPointContext The incoming SailPoint context 092 * @param request The Request to process, containing worker objects 093 * @throws RequestPermanentException if any failures occur 094 */ 095 private void runWorkers(SailPointContext sailPointContext, Request request) throws RequestPermanentException { 096 AtomicInteger completed = new AtomicInteger(); 097 AtomicInteger failed = new AtomicInteger(); 098 TaskMonitor monitor = new TaskMonitor(sailPointContext, request); 099 try { 100 List<String> workers = request.getAttributes().getStringList(SailPointWorker.MULTI_SERIALIZED_WORKERS_ATTR); 101 for (String workerString : Util.safeIterable(workers)) { 102 if (terminated.get()) { 103 log.warn("Terminated"); 104 break; 105 } 106 TaskResult masterResult = monitor.lockMasterResult(); 107 try { 108 if (Thread.interrupted() || masterResult.isTerminateRequested() || masterResult.isTerminated()) { 109 log.warn("Master result is terminated"); 110 if (!terminated.get()) { 111 if (runningWorker != null) { 112 runningWorker.terminate(); 113 } 114 } 115 break; 116 } 117 } finally { 118 monitor.commitMasterResult(); 119 } 120 byte[] workerBytes = Base64.getDecoder().decode(Compressor.decompress(workerString)); 121 SailPointWorker worker = (SailPointWorker) Util.readSerializedObject(workerBytes); 122 try { 123 runningWorker = worker; 124 worker.setMonitor(monitor); 125 126 // TODO figure out if it's possible to invoke implementation() here 127 worker.execute(sailPointContext, log); 128 completed.incrementAndGet(); 129 } catch(Exception e) { 130 log.error("Caught an error executing a worker", e); 131 failed.incrementAndGet(); 132 } finally { 133 runningWorker = null; 134 } 135 } 136 } catch(RequestPermanentException e) { 137 throw e; 138 } catch(GeneralException | IOException | ClassNotFoundException e) { 139 log.error("Caught an error preparing worker execution", e); 140 try { 141 TaskResult taskResult = monitor.lockPartitionResult(); 142 try { 143 taskResult.addException(e); 144 } finally { 145 monitor.commitPartitionResult(); 146 } 147 } catch (Exception e2) { 148 log.debug("Caught an exception trying to log an exception", e2); 149 } 150 // This causes IIQ to not retry the request 151 throw new RequestPermanentException(e); 152 } finally { 153 try { 154 TaskResult taskResult = monitor.lockPartitionResult(); 155 try { 156 taskResult.setInt("completed", completed.get()); 157 taskResult.setInt("failed", failed.get()); 158 } finally { 159 monitor.commitPartitionResult(); 160 } 161 } catch(Exception e2) { 162 log.debug("Caught an exception logging an exception", e2); 163 } 164 } 165 } 166 167 @Override 168 public boolean terminate() { 169 terminated.set(true); 170 if (runningWorker != null) { 171 log.warn("Terminating running worker thread " + runningWorker); 172 runningWorker.terminate(); 173 } 174 return super.terminate(); 175 } 176}