001package com.identityworksllc.iiq.common.query; 002 003import com.identityworksllc.iiq.common.iterators.ResultSetIterator; 004import com.identityworksllc.iiq.common.threads.PooledWorkerResults; 005import com.identityworksllc.iiq.common.threads.SailPointWorker; 006import com.identityworksllc.iiq.common.vo.Failure; 007import openconnector.Util; 008import org.apache.commons.logging.Log; 009import sailpoint.api.SailPointContext; 010import sailpoint.api.SailPointFactory; 011import sailpoint.object.Filter; 012import sailpoint.object.QueryOptions; 013import sailpoint.object.SailPointObject; 014import sailpoint.tools.GeneralException; 015import sailpoint.tools.ObjectNotFoundException; 016 017import java.sql.Connection; 018import java.sql.PreparedStatement; 019import java.sql.ResultSet; 020import java.sql.ResultSetMetaData; 021import java.sql.SQLException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031 032/** 033 * Simplifies querying by automatically enclosing queries in a result processing loop. 034 * This is copied from an older project. IIQ also provides a number of similar utilities 035 * in their {@link sailpoint.tools.JdbcUtil} class. 036 * 037 * @param <T> The type returned from the result processor 038 */ 039@SuppressWarnings("unused") 040public class QueryUtil<T> { 041 042 /** 043 * Callback for processing the result 044 * 045 * @param <U> The type returned from the result processor, must extend T 046 */ 047 @FunctionalInterface 048 public interface ResultProcessor<U> { 049 050 /** 051 * Callback to indicate that the result set had no results 052 * 053 * @throws SQLException on failures 054 */ 055 @SuppressWarnings("unused") 056 default void noResult() throws SQLException { 057 // Blank by default 058 } 059 060 /** 061 * Called once per result. Do not call "next" on the ResultSet here. 062 * 063 * @param result The result set at the current point 064 * @return An object of type T 065 * @throws GeneralException on any Sailpoint failures 066 * @throws SQLException on any database failures 067 */ 068 U processResult(ResultSet result) throws GeneralException, SQLException; 069 } 070 071 /** 072 * Static helper method to retrieve the first value from the result set as a long 073 * 074 * @param query The query to run 075 * @param resultColumn The column to grab from the results 076 * @param Log logger 077 * @param parameters Any parameters 078 * @return A list of result strings 079 * @throws Throwable On failures 080 */ 081 public static Long getLongValue(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable { 082 return new QueryUtil<Long>(Log).getResult(query, result -> result.getLong(resultColumn), parameters); 083 } 084 085 /** 086 * Retrieves the first string from the result set in the given column, then 087 * queries for a sailPoint object of the given type based on that name or 088 * ID. The column name is assumed to be 'id', which is the primary key in 089 * most of the SailPoint tables. 090 * 091 * @param context The Sailpoint context to use to query 092 * @param cls The class to query based on the ID being pulled 093 * @param query The query to run 094 * @param log logger 095 * @param parameters Any parameters 096 * @return A list of result strings 097 * @throws Throwable On failures 098 */ 099 public static <T extends SailPointObject> T getObjectByQuery(final SailPointContext context, Class<T> cls, final String query, final Log log, Object... parameters) throws Throwable { 100 return getObjectByQuery(context, cls, query, "id", log, parameters); 101 } 102 103 /** 104 * Retrieves the first string from the result set in the given column, then 105 * queries for a sailPoint object of the given type based on that name or 106 * ID. 107 * 108 * @param context The Sailpoint context to use to query 109 * @param cls The class to query based on the ID being pulled 110 * @param query The query to run 111 * @param idColumn The column to grab from the results 112 * @param log logger 113 * @param parameters Any parameters 114 * @return A list of result strings 115 * @throws Throwable On failures 116 */ 117 public static <T extends SailPointObject> T getObjectByQuery(final SailPointContext context, Class<T> cls, final String query, final String idColumn, final Log log, Object... parameters) throws Throwable { 118 String id = getStringValue(query, idColumn, log, parameters); 119 return context.getObject(cls, id); 120 } 121 122 /** 123 * Static helper method ot retrieve values from the query as a list of strings 124 * 125 * @param query The query to run 126 * @param resultColumn The column to grab from the results 127 * @param Log logger 128 * @param parameters Any parameters 129 * @return A list of result strings 130 * @throws Throwable On failures 131 */ 132 public static List<String> getStringList(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable { 133 return new QueryUtil<String>(Log).getResults(query, result -> result.getString(resultColumn), parameters); 134 } 135 136 /** 137 * Static helper method to retrieve the first string value from the result set 138 * 139 * @param query The query to run 140 * @param resultColumn The column to grab from the results 141 * @param Log logger 142 * @param parameters Any parameters 143 * @return A list of result strings 144 * @throws Throwable On failures 145 */ 146 public static String getStringValue(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable { 147 return new QueryUtil<String>(Log).getResult(query, result -> result.getString(resultColumn), parameters); 148 } 149 150 /** 151 * Retrieves a unique object with the given Filter string 152 * @param context The Sailpoint Context 153 * @param cls The class to query for 154 * @param filterString The Filter string 155 * @param <T> The type of the object to query 156 * @return The queried object 157 * @throws GeneralException if too many or too few objects are found 158 */ 159 public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, String filterString) throws GeneralException { 160 Filter filter = Filter.compile(filterString); 161 return getUniqueObject(context, cls, filter); 162 } 163 164 /** 165 * Retrieves a unique object with the given Filter string 166 * @param context The Sailpoint Context 167 * @param cls The class to query for 168 * @param filter a Filter object 169 * @param <T> The type of the object to query 170 * @return The queried object 171 * @throws GeneralException if too many or too few objects are found 172 */ 173 public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, Filter filter) throws GeneralException { 174 QueryOptions qo = new QueryOptions(); 175 qo.add(filter); 176 return getUniqueObject(context, cls, qo); 177 } 178 179 /** 180 * Retrieves a unique object with the given Filter string 181 * @param context The Sailpoint Context 182 * @param cls The class to query for 183 * @param queryOptions a QueryOptions object which will be cloned before querying 184 * @param <T> The type of the object to query 185 * @return The queried object 186 * @throws ObjectNotFoundException if no matches are found 187 * @throws GeneralException if too many or too few objects are found 188 */ 189 public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, QueryOptions queryOptions) throws ObjectNotFoundException, GeneralException { 190 QueryOptions qo = new QueryOptions(); 191 qo.setFilters(queryOptions.getFilters()); 192 qo.setCacheResults(queryOptions.isCacheResults()); 193 qo.setDirtyRead(queryOptions.isDirtyRead()); 194 qo.setDistinct(queryOptions.isDistinct()); 195 qo.setFlushBeforeQuery(queryOptions.isFlushBeforeQuery()); 196 qo.setGroupBys(queryOptions.getGroupBys()); 197 qo.setOrderings(queryOptions.getOrderings()); 198 qo.setScopeResults(queryOptions.getScopeResults()); 199 qo.setTransactionLock(queryOptions.isTransactionLock()); 200 qo.setUnscopedGloballyAccessible(queryOptions.getUnscopedGloballyAccessible()); 201 202 qo.setResultLimit(2); 203 204 List<T> results = context.getObjects(cls, qo); 205 if (results == null || results.isEmpty()) { 206 throw new ObjectNotFoundException(); 207 } else if (results.size() > 1) { 208 throw new GeneralException("Expected a unique result, got " + results.size() + " results"); 209 } 210 return results.get(0); 211 } 212 213 /** 214 * Set up the given parameters in the prepared statmeent 215 * @param stmt The statement 216 * @param parameters The parameters 217 * @throws SQLException if any failures occur setting parameters 218 */ 219 public static void setupParameters(PreparedStatement stmt, Object[] parameters) throws SQLException { 220 Parameters.setupParameters(stmt, parameters); 221 } 222 223 /** 224 * Logger 225 */ 226 private final Log logger; 227 /** 228 * Connection to SailPoint 229 */ 230 private final SailPointContext sailPointContext; 231 232 /** 233 * Constructor 234 * @param Log logger 235 * @throws GeneralException if there is an error getting the current thread's SailPoint context 236 */ 237 public QueryUtil(@SuppressWarnings("unused") Log Log) throws GeneralException { 238 this(SailPointFactory.getCurrentContext(), Log); 239 } 240 241 /** 242 * Constructor 243 * @param context The SailPoint context 244 * @param log The logger 245 */ 246 public QueryUtil(SailPointContext context, Log log) { 247 this.sailPointContext = context; 248 this.logger = log; 249 } 250 251 /** 252 * @param query The query to execute 253 * @param processor The class to call for every iteration of the loop 254 * @param parameters The parameters for the query, if any 255 * @return A list of results output by the ResultProcessor 256 * @throws GeneralException on any Sailpoint failures 257 * @throws SQLException on any database failures 258 */ 259 public T getResult(String query, ResultProcessor<? extends T> processor, Object... parameters) throws GeneralException, SQLException { 260 logger.debug("Query = " + query); 261 T output = null; 262 try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) { 263 try (PreparedStatement stmt = conn.prepareStatement(query)) { 264 setupParameters(stmt, parameters); 265 try (ResultSet results = stmt.executeQuery()) { 266 if (results.next()) { 267 output = processor.processResult(results); 268 } else { 269 processor.noResult(); 270 } 271 } 272 } 273 } 274 return output; 275 } 276 277 /** 278 * @param query The query to execute 279 * @param processor The class to call for every iteration of the loop 280 * @param parameters The parameters for the query, if any 281 * @return A list of results output by the ResultProcessor 282 * @throws GeneralException on any Sailpoint failures 283 * @throws SQLException on any database failures 284 */ 285 public List<T> getResults(String query, ResultProcessor<? extends T> processor, Object... parameters) throws GeneralException, SQLException { 286 logger.debug("Query = " + query); 287 List<T> output = new ArrayList<>(); 288 try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) { 289 try (PreparedStatement stmt = conn.prepareStatement(query)) { 290 setupParameters(stmt, parameters); 291 try (ResultSet results = stmt.executeQuery()) { 292 boolean hasResults = false; 293 while (results.next()) { 294 hasResults = true; 295 output.add(processor.processResult(results)); 296 } 297 if (!hasResults) { 298 processor.noResult(); 299 } 300 } 301 } 302 } 303 return output; 304 } 305 306 /** 307 * Iterates a query by invoking a Beanshell callback for each method 308 * @param inputs The inputs 309 * @throws GeneralException if anything goes wrong 310 */ 311 public void iterateQuery(IterateQueryOptions inputs) throws GeneralException { 312 try (Connection connection = inputs.openConnection()) { 313 try (NamedParameterStatement stmt = new NamedParameterStatement(connection, inputs.getQuery())) { 314 if (!Util.isEmpty(inputs.getQueryParams())) { 315 stmt.setParameters(inputs.getQueryParams()); 316 } 317 318 try (ResultSet results = stmt.executeQuery()) { 319 ResultSetMetaData rsmd = results.getMetaData(); 320 List<String> columns = new ArrayList<>(); 321 for(int c = 1; c <= rsmd.getColumnCount(); c++) { 322 columns.add(rsmd.getColumnLabel(c)); 323 } 324 ResultSetIterator rsi = new ResultSetIterator(results, columns, sailPointContext); 325 while(rsi.hasNext()) { 326 Map<String, Object> row = rsi.nextRow(); 327 inputs.doCallback(row); 328 } 329 } 330 } 331 } catch(SQLException e) { 332 throw new GeneralException(e); 333 } 334 } 335 336 /** 337 * Iterates over a query in parallel, making a call to the defined callback 338 * in the input options. (NOTE: Beanshell is explicitly thread-safe, but you 339 * should use the thread context provided and you should not access shared 340 * resources without doing your own thread-safety stuff.) 341 * 342 * @param inputs The input options 343 * @param threads The number of threads to use 344 * @throws GeneralException if anything fails 345 */ 346 public PooledWorkerResults<Map<String, Object>> parallelIterateQuery(IterateQueryOptions inputs, int threads) throws GeneralException { 347 PooledWorkerResults<Map<String, Object>> resultContainer = new PooledWorkerResults<>(); 348 349 ExecutorService executor = Executors.newWorkStealingPool(threads); 350 logger.info("Starting worker pool with " + threads + " threads"); 351 352 try (Connection connection = inputs.openConnection()) { 353 try (NamedParameterStatement stmt = new NamedParameterStatement(connection, inputs.getQuery())) { 354 if (!Util.isEmpty(inputs.getQueryParams())) { 355 stmt.setParameters(inputs.getQueryParams()); 356 } 357 358 try (ResultSet results = stmt.executeQuery()) { 359 ResultSetMetaData rsmd = results.getMetaData(); 360 List<String> columns = new ArrayList<>(); 361 for(int c = 1; c <= rsmd.getColumnCount(); c++) { 362 columns.add(rsmd.getColumnLabel(c)); 363 } 364 ResultSetIterator rsi = new ResultSetIterator(results, columns, sailPointContext); 365 while(rsi.hasNext() && Thread.currentThread().isInterrupted()) { 366 final Map<String, Object> row = new HashMap<>(rsi.nextRow()); 367 368 SailPointWorker worker = setupWorker(inputs, resultContainer, row); 369 370 executor.submit(worker.runnable()); 371 } 372 373 if (Thread.currentThread().isInterrupted()) { 374 executor.shutdownNow(); 375 resultContainer.setInterrupted(true); 376 } else { 377 executor.shutdown(); 378 while (!executor.isTerminated()) { 379 boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS); 380 if (!terminated) { 381 logger.debug("Waiting for thread pool to complete..."); 382 } 383 } 384 } 385 } 386 } 387 } catch(SQLException | InterruptedException e) { 388 resultContainer.setInterrupted(true); 389 if (!executor.isTerminated()) { 390 executor.shutdownNow(); 391 } 392 throw new GeneralException(e); 393 } 394 395 return resultContainer; 396 } 397 398 /** 399 * Sets up the parallel SailPointWorker that will invoke the callback and do 400 * appropriate error handling. 401 * 402 * @param inputs The original query inputs 403 * @param resultContainer The results container for reporting results 404 * @param row The output row 405 * @return if any failures occur 406 */ 407 private SailPointWorker setupWorker(IterateQueryOptions inputs, PooledWorkerResults<Map<String, Object>> resultContainer, Map<String, Object> row) { 408 SailPointWorker.ExceptionHandler exceptionHandler = t -> { 409 logger.error("Caught an error processing result row", t); 410 if (t instanceof Exception) { 411 resultContainer.addFailure(new Failure<>(row, (Exception) t)); 412 } 413 }; 414 415 SailPointWorker worker = new SailPointWorker() { 416 @Override 417 public Object execute(SailPointContext context, Log logger) throws Exception { 418 inputs.doParallelCallback(context, row); 419 return null; 420 } 421 }; 422 423 worker.setExceptionHandler(exceptionHandler); 424 worker.setCompletedCounter(resultContainer.getCompleted()); 425 worker.setFailedCounter(resultContainer.getFailed()); 426 return worker; 427 } 428 429 /** 430 * Run an update statement against the database directly (use sparingly) 431 * 432 * @param query The query to execute 433 * @param parameters The parameters to include in the query 434 * @return The return from executeUpdate() 435 * @throws Exception On failures 436 */ 437 public int update(String query, Object... parameters) throws Exception { 438 logger.debug("Query = " + query); 439 try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) { 440 try (PreparedStatement stmt = conn.prepareStatement(query)) { 441 setupParameters(stmt, parameters); 442 return stmt.executeUpdate(); 443 } 444 } 445 } 446}