001package com.identityworksllc.iiq.common.request;
002
003import com.identityworksllc.iiq.common.threads.SailPointWorker;
004import org.apache.commons.logging.Log;
005import org.apache.commons.logging.LogFactory;
006import sailpoint.api.SailPointContext;
007import sailpoint.object.Attributes;
008import sailpoint.object.Request;
009import sailpoint.object.TaskResult;
010import sailpoint.request.AbstractRequestExecutor;
011import sailpoint.request.RequestPermanentException;
012import sailpoint.task.TaskMonitor;
013import sailpoint.tools.Compressor;
014import sailpoint.tools.GeneralException;
015import sailpoint.tools.Util;
016
017import java.io.IOException;
018import java.util.Base64;
019import java.util.List;
020import java.util.concurrent.atomic.AtomicBoolean;
021import java.util.concurrent.atomic.AtomicInteger;
022
023/**
024 * Request executor to allow partitioning of workers across all nodes in the cluster.
025 * Your Request should include an attribute called serializedWorkers, which must be
026 * a list of Base64-encoded strings, each representing a Java Serialized worker.
027 * Each worker should encapsulate the work it's expected to do and must not depend
028 * on any inputs from the Request.
029 *
030 * (In other words, the worker should be able to do the same job whether invoked via
031 * a Request or directly in-line in your Java code.)
032 *
033 * Workers will be deserialized using ObjectInputStream.
034 *
035 * Workers will run in a single Sailpoint context and will be invoked via execute()
036 * and not via implementation().
037 *
038 * Each worker will be provided a TaskMonitor for this Request, which it can use to
039 * execute partition status updates.
040 *
041 * I recommend creating a number of partitions with relatively small worker counts,
042 * because every update of the Request will need to serialize the entire object,
043 * including your strings, back to the database.
044 *
045 * An interrupted SailPointWorkerExecutor (e.g., via server shutdown) will be
046 * deleted rather than resumed when the server is restarted. You will have no
047 * notification of this.
048 */
049public final class SailPointWorkerExecutor extends AbstractRequestExecutor {
050    /**
051     * The request definition name. You will need to have imported this request
052     * definition XML.
053     */
054    public static final String REQUEST_DEFINITION = "IDW Worker Executor";
055
056    private final Log log;
057
058    /**
059     * The currently running worker, used to handle termination gracefully
060     */
061    private SailPointWorker runningWorker;
062
063    /**
064     * Will be set to true when terminate() is called
065     */
066    private final AtomicBoolean terminated;
067
068    public SailPointWorkerExecutor() {
069        this.terminated = new AtomicBoolean();
070        this.log = LogFactory.getLog(this.getClass());
071    }
072
073    /**
074     * Main entry point
075     * @see AbstractRequestExecutor#execute(SailPointContext, Request, Attributes)
076     */
077    @Override
078    public void execute(SailPointContext sailPointContext, Request request, Attributes<String, Object> attributes) throws RequestPermanentException {
079        if (log.isDebugEnabled()) {
080            log.debug("Beginning request " + request.getName());
081        }
082        runWorkers(sailPointContext, request);
083        if (log.isDebugEnabled()) {
084            log.debug("Finished request " + request.getName());
085        }
086    }
087
088    /**
089     * Executes the given list of workers serially
090     *
091     * @param sailPointContext The incoming SailPoint context
092     * @param request The Request to process, containing worker objects
093     * @throws RequestPermanentException if any failures occur
094     */
095    private void runWorkers(SailPointContext sailPointContext, Request request) throws RequestPermanentException {
096        AtomicInteger completed = new AtomicInteger();
097        AtomicInteger failed = new AtomicInteger();
098        TaskMonitor monitor = new TaskMonitor(sailPointContext, request);
099        try {
100            List<String> workers = request.getAttributes().getStringList(SailPointWorker.MULTI_SERIALIZED_WORKERS_ATTR);
101            for (String workerString : Util.safeIterable(workers)) {
102                if (terminated.get()) {
103                    log.warn("Terminated");
104                    break;
105                }
106                TaskResult masterResult = monitor.lockMasterResult();
107                try {
108                    if (Thread.interrupted() || masterResult.isTerminateRequested() || masterResult.isTerminated()) {
109                        log.warn("Master result is terminated");
110                        if (!terminated.get()) {
111                            if (runningWorker != null) {
112                                runningWorker.terminate();
113                            }
114                        }
115                        break;
116                    }
117                } finally {
118                    monitor.commitMasterResult();
119                }
120                byte[] workerBytes = Base64.getDecoder().decode(Compressor.decompress(workerString));
121                SailPointWorker worker = (SailPointWorker) Util.readSerializedObject(workerBytes);
122                try {
123                    runningWorker = worker;
124                    worker.setMonitor(monitor);
125
126                    // TODO figure out if it's possible to invoke implementation() here
127                    worker.execute(sailPointContext, log);
128                    completed.incrementAndGet();
129                } catch(Exception e) {
130                    log.error("Caught an error executing a worker", e);
131                    failed.incrementAndGet();
132                } finally {
133                    runningWorker = null;
134                }
135            }
136        } catch(RequestPermanentException e) {
137            throw e;
138        } catch(GeneralException | IOException | ClassNotFoundException e) {
139            log.error("Caught an error preparing worker execution", e);
140            try {
141                TaskResult taskResult = monitor.lockPartitionResult();
142                try {
143                    taskResult.addException(e);
144                } finally {
145                    monitor.commitPartitionResult();
146                }
147            } catch (Exception e2) {
148                log.debug("Caught an exception trying to log an exception", e2);
149            }
150            // This causes IIQ to not retry the request
151            throw new RequestPermanentException(e);
152        } finally {
153            try {
154                TaskResult taskResult = monitor.lockPartitionResult();
155                try {
156                    taskResult.setInt("completed", completed.get());
157                    taskResult.setInt("failed", failed.get());
158                } finally {
159                    monitor.commitPartitionResult();
160                }
161            } catch(Exception e2) {
162                log.debug("Caught an exception logging an exception", e2);
163            }
164        }
165    }
166
167    @Override
168    public boolean terminate() {
169        terminated.set(true);
170        if (runningWorker != null) {
171            log.warn("Terminating running worker thread " + runningWorker);
172            runningWorker.terminate();
173        }
174        return super.terminate();
175    }
176}