001package com.identityworksllc.iiq.common.request; 002 003import com.identityworksllc.iiq.common.threads.SailPointWorker; 004import org.apache.commons.logging.Log; 005import org.apache.commons.logging.LogFactory; 006import sailpoint.api.SailPointContext; 007import sailpoint.object.Attributes; 008import sailpoint.object.Request; 009import sailpoint.object.TaskResult; 010import sailpoint.request.AbstractRequestExecutor; 011import sailpoint.request.RequestPermanentException; 012import sailpoint.task.TaskMonitor; 013import sailpoint.tools.Compressor; 014import sailpoint.tools.GeneralException; 015import sailpoint.tools.Util; 016 017import java.io.IOException; 018import java.util.Base64; 019import java.util.List; 020import java.util.concurrent.atomic.AtomicBoolean; 021import java.util.concurrent.atomic.AtomicInteger; 022 023/** 024 * Request executor to allow partitioning of workers across all nodes in the cluster. 025 * Your Request should include an attribute called serializedWorkers, which must be 026 * a list of Base64-encoded strings, each representing a Java Serialized worker. 027 * Each worker should encapsulate the work it's expected to do and must not depend 028 * on any inputs from the Request. 029 * 030 * (In other words, the worker should be able to do the same job whether invoked via 031 * a Request or directly in-line in your Java code.) 032 * 033 * Workers will be deserialized using ObjectInputStream. 034 * 035 * Workers will run in a single Sailpoint context and will be invoked via execute() 036 * and not via implementation(). 037 * 038 * Each worker will be provided a TaskMonitor for this Request, which it can use to 039 * execute partition status updates. 040 * 041 * I recommend creating a number of partitions with relatively small worker counts, 042 * because every update of the Request will need to serialize the entire object, 043 * including your strings, back to the database. 044 * 045 * An interrupted SailPointWorkerExecutor (e.g., via server shutdown) will be 046 * deleted rather than resumed when the server is restarted. You will have no 047 * notification of this. 048 */ 049public final class SailPointWorkerExecutor extends AbstractRequestExecutor { 050 /** 051 * The request definition name. You will need to have imported this request 052 * definition XML. 053 */ 054 public static final String REQUEST_DEFINITION = "IDW Worker Executor"; 055 056 private final Log log; 057 058 /** 059 * The currently running worker, used to handle termination gracefully 060 */ 061 private SailPointWorker runningWorker; 062 063 /** 064 * Will be set to true when terminate() is called 065 */ 066 private final AtomicBoolean terminated; 067 068 /** 069 * Constructs a new executor 070 */ 071 public SailPointWorkerExecutor() { 072 this.terminated = new AtomicBoolean(); 073 this.log = LogFactory.getLog(this.getClass()); 074 } 075 076 /** 077 * Main entry point 078 * @see AbstractRequestExecutor#execute(SailPointContext, Request, Attributes) 079 */ 080 @Override 081 public void execute(SailPointContext sailPointContext, Request request, Attributes<String, Object> attributes) throws RequestPermanentException { 082 if (log.isDebugEnabled()) { 083 log.debug("Beginning request " + request.getName()); 084 } 085 runWorkers(sailPointContext, request); 086 if (log.isDebugEnabled()) { 087 log.debug("Finished request " + request.getName()); 088 } 089 } 090 091 /** 092 * Executes the given list of workers serially 093 * 094 * @param sailPointContext The incoming SailPoint context 095 * @param request The Request to process, containing worker objects 096 * @throws RequestPermanentException if any failures occur 097 */ 098 private void runWorkers(SailPointContext sailPointContext, Request request) throws RequestPermanentException { 099 AtomicInteger completed = new AtomicInteger(); 100 AtomicInteger failed = new AtomicInteger(); 101 TaskMonitor monitor = new TaskMonitor(sailPointContext, request); 102 try { 103 List<String> workers = request.getAttributes().getStringList(SailPointWorker.MULTI_SERIALIZED_WORKERS_ATTR); 104 for (String workerString : Util.safeIterable(workers)) { 105 if (terminated.get()) { 106 log.warn("Terminated"); 107 break; 108 } 109 TaskResult masterResult = monitor.lockMasterResult(); 110 try { 111 if (Thread.interrupted() || masterResult.isTerminateRequested() || masterResult.isTerminated()) { 112 log.warn("Master result is terminated"); 113 if (!terminated.get()) { 114 if (runningWorker != null) { 115 runningWorker.terminate(); 116 } 117 } 118 break; 119 } 120 } finally { 121 monitor.commitMasterResult(); 122 } 123 byte[] workerBytes = Base64.getDecoder().decode(Compressor.decompress(workerString)); 124 SailPointWorker worker = (SailPointWorker) Util.readSerializedObject(workerBytes); 125 try { 126 runningWorker = worker; 127 worker.setMonitor(monitor); 128 129 // TODO figure out if it's possible to invoke implementation() here 130 worker.execute(sailPointContext, log); 131 completed.incrementAndGet(); 132 } catch(Exception e) { 133 log.error("Caught an error executing a worker", e); 134 failed.incrementAndGet(); 135 } finally { 136 runningWorker = null; 137 } 138 } 139 } catch(RequestPermanentException e) { 140 throw e; 141 } catch(GeneralException | IOException | ClassNotFoundException e) { 142 log.error("Caught an error preparing worker execution", e); 143 try { 144 TaskResult taskResult = monitor.lockPartitionResult(); 145 try { 146 taskResult.addException(e); 147 } finally { 148 monitor.commitPartitionResult(); 149 } 150 } catch (Exception e2) { 151 log.debug("Caught an exception trying to log an exception", e2); 152 } 153 // This causes IIQ to not retry the request 154 throw new RequestPermanentException(e); 155 } finally { 156 try { 157 TaskResult taskResult = monitor.lockPartitionResult(); 158 try { 159 taskResult.setInt("completed", completed.get()); 160 taskResult.setInt("failed", failed.get()); 161 } finally { 162 monitor.commitPartitionResult(); 163 } 164 } catch(Exception e2) { 165 log.debug("Caught an exception logging an exception", e2); 166 } 167 } 168 } 169 170 @Override 171 public boolean terminate() { 172 terminated.set(true); 173 if (runningWorker != null) { 174 log.warn("Terminating running worker thread " + runningWorker); 175 runningWorker.terminate(); 176 } 177 return super.terminate(); 178 } 179}