001package com.identityworksllc.iiq.common.threads;
002
003import com.identityworksllc.iiq.common.Maybe;
004import com.identityworksllc.iiq.common.Metered;
005import com.identityworksllc.iiq.common.annotation.Experimental;
006import org.apache.commons.logging.Log;
007import org.apache.commons.logging.LogFactory;
008import sailpoint.api.Provisioner;
009import sailpoint.api.SailPointContext;
010import sailpoint.object.*;
011import sailpoint.provisioning.PlanEvaluator;
012
013import java.util.*;
014import java.util.concurrent.*;
015
016/**
017 * A class implementing parallel provisioner execution. Each provisioning operation
018 * will run in its own thread, resulting in either a ProvisioningProject or some
019 * exception. The {@link Maybe} class is used to encapsulate the state, containing
020 * either a valid output or an exception, for each item.
021 *
022 * The {@link Provisioner} will be invoked with `disableRetryRequest` set to `true`,
023 * so the caller is responsible for detecting and retrying any provisioning failures.
024 *
025 * @since 2024-09-26
026 */
027@Experimental
028public class ParallelProvisioner {
029    public static class ParallelProvisioningTask {
030        private final ProvisioningPlan plan;
031        private final Future<Maybe<ProvisioningProject>> projectFuture;
032
033        /**
034         * Constructs a n ew ParallelProvisioningTask for the given pair of plan
035         * and background executor.
036         *
037         * @param plan The plan
038         * @param projectFuture The project future, from {@link ParallelProvisioningWorker}
039         */
040        public ParallelProvisioningTask(ProvisioningPlan plan, Future<Maybe<ProvisioningProject>> projectFuture) {
041            this.plan = plan;
042            this.projectFuture = projectFuture;
043        }
044
045        /**
046         * Attempts to cancel the running background task
047         * @return Attempts to cancel the running background task
048         */
049        public boolean cancel() {
050            return this.projectFuture.cancel(true);
051        }
052
053        /**
054         * Gets the original ProvisioningPlan associated with this outcome
055         * @return The provisioning plan
056         */
057        public ProvisioningPlan getPlan() {
058            return plan;
059        }
060
061        /**
062         * If the task has completed, returns an {@link Optional} containing the {@link ProvisioningProject},
063         * which itself will contain the outcome of provisioning. If the task has not completed, or was canceled,
064         * returns an empty {@link Optional}. If the task completed, but failed, re-throws the exception.
065         *
066         * @return An {@link Optional} {@link ProvisioningProject}, as described above
067         * @throws Exception if the execution finished but failed, or if there is an error retrieving the outcome
068         */
069        public Optional<ProvisioningProject> getProject() throws Exception {
070            if (!isDone()) {
071                return Optional.empty();
072            }
073
074            if (this.projectFuture.isCancelled()) {
075                return Optional.empty();
076            }
077
078            Maybe<ProvisioningProject> output = this.projectFuture.get();
079            if (output.hasValue()) {
080                return Optional.of(output.get());
081            } else {
082                throw (Exception) output.getError();
083            }
084        }
085
086        /**
087         * Returns true if {@link Future#isDone()} returns true.
088         *
089         * @return True if the background task is either done or canceled
090         */
091        public boolean isDone() {
092            return this.projectFuture.isDone();
093        }
094    }
095
096    /**
097     * The worker thread for the parallel provisioning operation. Submits the given plan
098     * to the {@link Provisioner} after reloading its contained {@link Identity}. If the
099     * plan succeeds, returns a {@link Maybe} containing the project.
100     */
101    private static class ParallelProvisioningWorker extends SailPointWorker {
102        /**
103         * The arguments for this provisioning worker
104         */
105        private final Attributes<String, Object> args;
106
107        /**
108         * The plan for this provisioning worker
109         */
110        private final ProvisioningPlan plan;
111
112        /**
113         * Constructs a new worker object for the given plan + arguments
114         * @param plan The provisioning plan for this worker
115         * @param args The arguments for this worker
116         */
117        public ParallelProvisioningWorker(ProvisioningPlan plan, Map<String, Object> args) {
118            this.plan = plan;
119            this.args = new Attributes<>(args);
120        }
121
122        /**
123         * Executes the provisioning plan in the thread context
124         *
125         * @param threadContext The private context to use for this thread worker
126         * @param logger The log attached to this Worker
127         * @return A {@link Maybe} containing the provisioning project or an error
128         */
129        @Override
130        public Object execute(SailPointContext threadContext, Log logger) {
131            try {
132                plan.setIdentity(threadContext.getObject(Identity.class, plan.getIdentity().getId()));
133
134                if (logger.isDebugEnabled()) {
135                    logger.debug("Executing provisioning plan: " + plan.toXml());
136                }
137
138                args.put(PlanEvaluator.ARG_NO_RETRY_REQUEST, true);
139
140                Provisioner provisioner = new Provisioner(threadContext, args);
141                provisioner.setSource(Source.Batch);
142                provisioner.execute(plan);
143
144                return Maybe.of(provisioner.getProject());
145            } catch (Exception e) {
146                String identityName = plan.getIdentity() != null ? plan.getIdentity().getName() : "(null)";
147                logger.warn("Failed to execute parallel provision plan for Identity " + identityName, e);
148                return Maybe.of(e);
149            }
150        }
151    }
152
153    /**
154     * Internal logger
155     */
156    private static final Log log = LogFactory.getLog(ParallelProvisioner.class);
157
158    /**
159     * The arguments for the provisioning action
160     */
161    private final Map<String, Object> arguments;
162
163    /**
164     * The number of threads for the provisioning thread pool
165     */
166    private final int threads;
167
168    /**
169     * Constructs a new parallel provisioner with the given number of threads and
170     * the default set of arguments.
171     *
172     * @param threads The number of threads
173     */
174    public ParallelProvisioner(int threads) {
175        this.arguments = new HashMap<>();
176        this.threads = Math.max(1, threads);
177    }
178
179    /**
180     * Constructs a new parallel provisioner with the given number of threads and
181     * the given set of arguments.
182     *
183     * @param threads The number of threads
184     * @param arguments The arguments passed to the Provisioner in each thread
185     */
186    public ParallelProvisioner(Map<String, Object> arguments, int threads) {
187        this.arguments = arguments;
188        this.threads = threads;
189    }
190
191    /**
192     * Provisions the given set of plans in the thread pool.
193     *
194     * @param plans The plans to provision
195     * @return A set of {@link ParallelProvisioningTask} objects, each representing the (future) outcome of one plan execution
196     */
197    public List<ParallelProvisioningTask> provisionPlans(List<ProvisioningPlan> plans) {
198        if (log.isDebugEnabled()) {
199            log.debug("Submitting " + plans.size() + " plans to a thread pool of size " + threads);
200        }
201
202        List<ParallelProvisioningTask> futures = new ArrayList<>();
203
204        ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 180, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
205        executor.allowCoreThreadTimeOut(true);
206
207        for(ProvisioningPlan plan : plans) {
208            Future<Maybe<ProvisioningProject>> future = executor.submit(
209                    () -> Metered.meter("ParallelProvisioner.execute", () -> {
210                        Callable<Object> obj = new ParallelProvisioningWorker(plan, arguments).toCallable();
211                        try {
212                            Object output = obj.call();
213                            return (Maybe<ProvisioningProject>) output;
214                        } catch (Exception e) {
215                            return Maybe.of(e);
216                        }
217                    })
218            );
219            futures.add(new ParallelProvisioningTask(plan, future));
220        }
221
222        executor.shutdown();
223
224        return futures;
225    }
226}