001package com.identityworksllc.iiq.common.request;
002
003import com.identityworksllc.iiq.common.threads.SailPointWorker;
004import org.apache.commons.logging.Log;
005import org.apache.commons.logging.LogFactory;
006import sailpoint.api.SailPointContext;
007import sailpoint.object.Attributes;
008import sailpoint.object.Request;
009import sailpoint.object.TaskResult;
010import sailpoint.request.AbstractRequestExecutor;
011import sailpoint.request.RequestPermanentException;
012import sailpoint.task.TaskMonitor;
013import sailpoint.tools.Compressor;
014import sailpoint.tools.GeneralException;
015import sailpoint.tools.Util;
016
017import java.io.IOException;
018import java.util.Base64;
019import java.util.List;
020import java.util.concurrent.atomic.AtomicBoolean;
021import java.util.concurrent.atomic.AtomicInteger;
022
023/**
024 * Request executor to allow partitioning of workers across all nodes in the cluster.
025 * Your Request should include an attribute called serializedWorkers, which must be
026 * a list of Base64-encoded strings, each representing a Java Serialized worker.
027 * Each worker should encapsulate the work it's expected to do and must not depend
028 * on any inputs from the Request.
029 *
030 * (In other words, the worker should be able to do the same job whether invoked via
031 * a Request or directly in-line in your Java code.)
032 *
033 * Workers will be deserialized using ObjectInputStream.
034 *
035 * Workers will run in a single Sailpoint context and will be invoked via execute()
036 * and not via implementation().
037 *
038 * Each worker will be provided a TaskMonitor for this Request, which it can use to
039 * execute partition status updates.
040 *
041 * I recommend creating a number of partitions with relatively small worker counts,
042 * because every update of the Request will need to serialize the entire object,
043 * including your strings, back to the database.
044 *
045 * An interrupted SailPointWorkerExecutor (e.g., via server shutdown) will be
046 * deleted rather than resumed when the server is restarted. You will have no
047 * notification of this.
048 */
049public final class SailPointWorkerExecutor extends AbstractRequestExecutor {
050    /**
051     * The request definition name. You will need to have imported this request
052     * definition XML.
053     */
054    public static final String REQUEST_DEFINITION = "IDW Worker Executor";
055
056    private final Log log;
057
058    /**
059     * The currently running worker, used to handle termination gracefully
060     */
061    private SailPointWorker runningWorker;
062
063    /**
064     * Will be set to true when terminate() is called
065     */
066    private final AtomicBoolean terminated;
067
068    /**
069     * Constructs a new executor
070     */
071    public SailPointWorkerExecutor() {
072        this.terminated = new AtomicBoolean();
073        this.log = LogFactory.getLog(this.getClass());
074    }
075
076    /**
077     * Main entry point
078     * @see AbstractRequestExecutor#execute(SailPointContext, Request, Attributes)
079     */
080    @Override
081    public void execute(SailPointContext sailPointContext, Request request, Attributes<String, Object> attributes) throws RequestPermanentException {
082        if (log.isDebugEnabled()) {
083            log.debug("Beginning request " + request.getName());
084        }
085        runWorkers(sailPointContext, request);
086        if (log.isDebugEnabled()) {
087            log.debug("Finished request " + request.getName());
088        }
089    }
090
091    /**
092     * Executes the given list of workers serially
093     *
094     * @param sailPointContext The incoming SailPoint context
095     * @param request The Request to process, containing worker objects
096     * @throws RequestPermanentException if any failures occur
097     */
098    private void runWorkers(SailPointContext sailPointContext, Request request) throws RequestPermanentException {
099        AtomicInteger completed = new AtomicInteger();
100        AtomicInteger failed = new AtomicInteger();
101        TaskMonitor monitor = new TaskMonitor(sailPointContext, request);
102        try {
103            List<String> workers = request.getAttributes().getStringList(SailPointWorker.MULTI_SERIALIZED_WORKERS_ATTR);
104            for (String workerString : Util.safeIterable(workers)) {
105                if (terminated.get()) {
106                    log.warn("Terminated");
107                    break;
108                }
109                TaskResult masterResult = monitor.lockMasterResult();
110                try {
111                    if (Thread.interrupted() || masterResult.isTerminateRequested() || masterResult.isTerminated()) {
112                        log.warn("Master result is terminated");
113                        if (!terminated.get()) {
114                            if (runningWorker != null) {
115                                runningWorker.terminate();
116                            }
117                        }
118                        break;
119                    }
120                } finally {
121                    monitor.commitMasterResult();
122                }
123                byte[] workerBytes = Base64.getDecoder().decode(Compressor.decompress(workerString));
124                SailPointWorker worker = (SailPointWorker) Util.readSerializedObject(workerBytes);
125                try {
126                    runningWorker = worker;
127                    worker.setMonitor(monitor);
128
129                    // TODO figure out if it's possible to invoke implementation() here
130                    worker.execute(sailPointContext, log);
131                    completed.incrementAndGet();
132                } catch(Exception e) {
133                    log.error("Caught an error executing a worker", e);
134                    failed.incrementAndGet();
135                } finally {
136                    runningWorker = null;
137                }
138            }
139        } catch(RequestPermanentException e) {
140            throw e;
141        } catch(GeneralException | IOException | ClassNotFoundException e) {
142            log.error("Caught an error preparing worker execution", e);
143            try {
144                TaskResult taskResult = monitor.lockPartitionResult();
145                try {
146                    taskResult.addException(e);
147                } finally {
148                    monitor.commitPartitionResult();
149                }
150            } catch (Exception e2) {
151                log.debug("Caught an exception trying to log an exception", e2);
152            }
153            // This causes IIQ to not retry the request
154            throw new RequestPermanentException(e);
155        } finally {
156            try {
157                TaskResult taskResult = monitor.lockPartitionResult();
158                try {
159                    taskResult.setInt("completed", completed.get());
160                    taskResult.setInt("failed", failed.get());
161                } finally {
162                    monitor.commitPartitionResult();
163                }
164            } catch(Exception e2) {
165                log.debug("Caught an exception logging an exception", e2);
166            }
167        }
168    }
169
170    @Override
171    public boolean terminate() {
172        terminated.set(true);
173        if (runningWorker != null) {
174            log.warn("Terminating running worker thread " + runningWorker);
175            runningWorker.terminate();
176        }
177        return super.terminate();
178    }
179}