001package com.identityworksllc.iiq.common.threads; 002 003import com.identityworksllc.iiq.common.Maybe; 004import com.identityworksllc.iiq.common.Metered; 005import com.identityworksllc.iiq.common.annotation.Experimental; 006import org.apache.commons.logging.Log; 007import org.apache.commons.logging.LogFactory; 008import sailpoint.api.Provisioner; 009import sailpoint.api.SailPointContext; 010import sailpoint.object.*; 011import sailpoint.provisioning.PlanEvaluator; 012 013import java.util.*; 014import java.util.concurrent.*; 015 016/** 017 * A class implementing parallel provisioner execution. Each provisioning operation 018 * will run in its own thread, resulting in either a ProvisioningProject or some 019 * exception. The {@link Maybe} class is used to encapsulate the state, containing 020 * either a valid output or an exception, for each item. 021 * 022 * The {@link Provisioner} will be invoked with `disableRetryRequest` set to `true`, 023 * so the caller is responsible for detecting and retrying any provisioning failures. 024 * 025 * @since 2024-09-26 026 */ 027@Experimental 028public class ParallelProvisioner { 029 public static class ParallelProvisioningTask { 030 private final ProvisioningPlan plan; 031 private final Future<Maybe<ProvisioningProject>> projectFuture; 032 033 /** 034 * Constructs a n ew ParallelProvisioningTask for the given pair of plan 035 * and background executor. 036 * 037 * @param plan The plan 038 * @param projectFuture The project future, from {@link ParallelProvisioningWorker} 039 */ 040 public ParallelProvisioningTask(ProvisioningPlan plan, Future<Maybe<ProvisioningProject>> projectFuture) { 041 this.plan = plan; 042 this.projectFuture = projectFuture; 043 } 044 045 /** 046 * Attempts to cancel the running background task 047 * @return Attempts to cancel the running background task 048 */ 049 public boolean cancel() { 050 return this.projectFuture.cancel(true); 051 } 052 053 /** 054 * Gets the original ProvisioningPlan associated with this outcome 055 * @return The provisioning plan 056 */ 057 public ProvisioningPlan getPlan() { 058 return plan; 059 } 060 061 /** 062 * If the task has completed, returns an {@link Optional} containing the {@link ProvisioningProject}, 063 * which itself will contain the outcome of provisioning. If the task has not completed, or was canceled, 064 * returns an empty {@link Optional}. If the task completed, but failed, re-throws the exception. 065 * 066 * @return An {@link Optional} {@link ProvisioningProject}, as described above 067 * @throws Exception if the execution finished but failed, or if there is an error retrieving the outcome 068 */ 069 public Optional<ProvisioningProject> getProject() throws Exception { 070 if (!isDone()) { 071 return Optional.empty(); 072 } 073 074 if (this.projectFuture.isCancelled()) { 075 return Optional.empty(); 076 } 077 078 Maybe<ProvisioningProject> output = this.projectFuture.get(); 079 if (output.hasValue()) { 080 return Optional.of(output.get()); 081 } else { 082 throw (Exception) output.getError(); 083 } 084 } 085 086 /** 087 * Returns true if {@link Future#isDone()} returns true. 088 * 089 * @return True if the background task is either done or canceled 090 */ 091 public boolean isDone() { 092 return this.projectFuture.isDone(); 093 } 094 } 095 096 /** 097 * The worker thread for the parallel provisioning operation. Submits the given plan 098 * to the {@link Provisioner} after reloading its contained {@link Identity}. If the 099 * plan succeeds, returns a {@link Maybe} containing the project. 100 */ 101 private static class ParallelProvisioningWorker extends SailPointWorker { 102 /** 103 * The arguments for this provisioning worker 104 */ 105 private final Attributes<String, Object> args; 106 107 /** 108 * The plan for this provisioning worker 109 */ 110 private final ProvisioningPlan plan; 111 112 /** 113 * Constructs a new worker object for the given plan + arguments 114 * @param plan The provisioning plan for this worker 115 * @param args The arguments for this worker 116 */ 117 public ParallelProvisioningWorker(ProvisioningPlan plan, Map<String, Object> args) { 118 this.plan = plan; 119 this.args = new Attributes<>(args); 120 } 121 122 /** 123 * Executes the provisioning plan in the thread context 124 * 125 * @param threadContext The private context to use for this thread worker 126 * @param logger The log attached to this Worker 127 * @return A {@link Maybe} containing the provisioning project or an error 128 */ 129 @Override 130 public Object execute(SailPointContext threadContext, Log logger) { 131 try { 132 plan.setIdentity(threadContext.getObject(Identity.class, plan.getIdentity().getId())); 133 134 if (logger.isDebugEnabled()) { 135 logger.debug("Executing provisioning plan: " + plan.toXml()); 136 } 137 138 args.put(PlanEvaluator.ARG_NO_RETRY_REQUEST, true); 139 140 Provisioner provisioner = new Provisioner(threadContext, args); 141 provisioner.setSource(Source.Batch); 142 provisioner.execute(plan); 143 144 return Maybe.of(provisioner.getProject()); 145 } catch (Exception e) { 146 String identityName = plan.getIdentity() != null ? plan.getIdentity().getName() : "(null)"; 147 logger.warn("Failed to execute parallel provision plan for Identity " + identityName, e); 148 return Maybe.of(e); 149 } 150 } 151 } 152 153 /** 154 * Internal logger 155 */ 156 private static final Log log = LogFactory.getLog(ParallelProvisioner.class); 157 158 /** 159 * The arguments for the provisioning action 160 */ 161 private final Map<String, Object> arguments; 162 163 /** 164 * The number of threads for the provisioning thread pool 165 */ 166 private final int threads; 167 168 /** 169 * Constructs a new parallel provisioner with the given number of threads and 170 * the default set of arguments. 171 * 172 * @param threads The number of threads 173 */ 174 public ParallelProvisioner(int threads) { 175 this.arguments = new HashMap<>(); 176 this.threads = Math.max(1, threads); 177 } 178 179 /** 180 * Constructs a new parallel provisioner with the given number of threads and 181 * the given set of arguments. 182 * 183 * @param threads The number of threads 184 * @param arguments The arguments passed to the Provisioner in each thread 185 */ 186 public ParallelProvisioner(Map<String, Object> arguments, int threads) { 187 this.arguments = arguments; 188 this.threads = threads; 189 } 190 191 /** 192 * Provisions the given set of plans in the thread pool. 193 * 194 * @param plans The plans to provision 195 * @return A set of {@link ParallelProvisioningTask} objects, each representing the (future) outcome of one plan execution 196 */ 197 public List<ParallelProvisioningTask> provisionPlans(List<ProvisioningPlan> plans) { 198 if (log.isDebugEnabled()) { 199 log.debug("Submitting " + plans.size() + " plans to a thread pool of size " + threads); 200 } 201 202 List<ParallelProvisioningTask> futures = new ArrayList<>(); 203 204 ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 180, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 205 executor.allowCoreThreadTimeOut(true); 206 207 for(ProvisioningPlan plan : plans) { 208 Future<Maybe<ProvisioningProject>> future = executor.submit( 209 () -> Metered.meter("ParallelProvisioner.execute", () -> { 210 Callable<Object> obj = new ParallelProvisioningWorker(plan, arguments).toCallable(); 211 try { 212 Object output = obj.call(); 213 return (Maybe<ProvisioningProject>) output; 214 } catch (Exception e) { 215 return Maybe.of(e); 216 } 217 }) 218 ); 219 futures.add(new ParallelProvisioningTask(plan, future)); 220 } 221 222 executor.shutdown(); 223 224 return futures; 225 } 226}