001package com.identityworksllc.iiq.common.query;
002
003import com.identityworksllc.iiq.common.iterators.ResultSetIterator;
004import com.identityworksllc.iiq.common.threads.PooledWorkerResults;
005import com.identityworksllc.iiq.common.threads.SailPointWorker;
006import com.identityworksllc.iiq.common.vo.Failure;
007import openconnector.Util;
008import org.apache.commons.logging.Log;
009import sailpoint.api.SailPointContext;
010import sailpoint.api.SailPointFactory;
011import sailpoint.object.Filter;
012import sailpoint.object.QueryOptions;
013import sailpoint.object.SailPointObject;
014import sailpoint.tools.GeneralException;
015import sailpoint.tools.ObjectNotFoundException;
016
017import java.sql.Connection;
018import java.sql.PreparedStatement;
019import java.sql.ResultSet;
020import java.sql.ResultSetMetaData;
021import java.sql.SQLException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031
032/**
033 * Simplifies querying by automatically enclosing queries in a result processing loop.
034 * This is copied from an older project. IIQ also provides a number of similar utilities
035 * in their {@link sailpoint.tools.JdbcUtil} class.
036 *
037 * @param <T> The type returned from the result processor
038 */
039@SuppressWarnings("unused")
040public class QueryUtil<T> {
041
042    /**
043     * Callback for processing the result
044     *
045     * @param <U> The type returned from the result processor, must extend T
046     */
047    @FunctionalInterface
048    public interface ResultProcessor<U> {
049
050        /**
051         * Callback to indicate that the result set had no results
052         *
053         * @throws SQLException on failures
054         */
055        @SuppressWarnings("unused")
056        default void noResult() throws SQLException {
057            // Blank by default
058        }
059
060        /**
061         * Called once per result. Do not call "next" on the ResultSet here.
062         *
063         * @param result The result set at the current point
064         * @return An object of type T
065         * @throws GeneralException on any Sailpoint failures
066         * @throws SQLException on any database failures
067         */
068        U processResult(ResultSet result) throws GeneralException, SQLException;
069    }
070
071    /**
072     * Static helper method to retrieve the first value from the result set as a long
073     *
074     * @param query The query to run
075     * @param resultColumn The column to grab from the results
076     * @param Log logger
077     * @param parameters Any parameters
078     * @return A list of result strings
079     * @throws Throwable On failures
080     */
081    public static Long getLongValue(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable {
082        return new QueryUtil<Long>(Log).getResult(query, result -> result.getLong(resultColumn), parameters);
083    }
084
085    /**
086     * Retrieves the first string from the result set in the given column, then
087     * queries for a sailPoint object of the given type based on that name or
088     * ID. The column name is assumed to be 'id', which is the primary key in
089     * most of the SailPoint tables.
090     *
091     * @param context The Sailpoint context to use to query
092     * @param cls The class to query based on the ID being pulled
093     * @param query The query to run
094     * @param log logger
095     * @param parameters Any parameters
096     * @return A list of result strings
097     * @throws Throwable On failures
098     */
099    public static <T extends SailPointObject> T getObjectByQuery(final SailPointContext context, Class<T> cls, final String query, final Log log, Object... parameters) throws Throwable {
100        return getObjectByQuery(context, cls, query, "id", log, parameters);
101    }
102
103    /**
104     * Retrieves the first string from the result set in the given column, then
105     * queries for a sailPoint object of the given type based on that name or
106     * ID.
107     *
108     * @param context The Sailpoint context to use to query
109     * @param cls The class to query based on the ID being pulled
110     * @param query The query to run
111     * @param idColumn The column to grab from the results
112     * @param log logger
113     * @param parameters Any parameters
114     * @return A list of result strings
115     * @throws Throwable On failures
116     */
117    public static <T extends SailPointObject> T getObjectByQuery(final SailPointContext context, Class<T> cls, final String query, final String idColumn, final Log log, Object... parameters) throws Throwable {
118        String id = getStringValue(query, idColumn, log, parameters);
119        return context.getObject(cls, id);
120    }
121
122    /**
123     * Static helper method ot retrieve values from the query as a list of strings
124     *
125     * @param query The query to run
126     * @param resultColumn The column to grab from the results
127     * @param Log logger
128     * @param parameters Any parameters
129     * @return A list of result strings
130     * @throws Throwable On failures
131     */
132    public static List<String> getStringList(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable {
133        return new QueryUtil<String>(Log).getResults(query, result -> result.getString(resultColumn), parameters);
134    }
135
136    /**
137     * Static helper method to retrieve the first string value from the result set
138     *
139     * @param query The query to run
140     * @param resultColumn The column to grab from the results
141     * @param Log logger
142     * @param parameters Any parameters
143     * @return A list of result strings
144     * @throws Throwable On failures
145     */
146    public static String getStringValue(final String query, final String resultColumn, Log Log, Object... parameters) throws Throwable {
147        return new QueryUtil<String>(Log).getResult(query, result -> result.getString(resultColumn), parameters);
148    }
149
150    /**
151     * Retrieves a unique object with the given Filter string
152     * @param context The Sailpoint Context
153     * @param cls The class to query for
154     * @param filterString The Filter string
155     * @param <T> The type of the object to query
156     * @return The queried object
157     * @throws GeneralException if too many or too few objects are found
158     */
159    public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, String filterString) throws GeneralException {
160        Filter filter = Filter.compile(filterString);
161        return getUniqueObject(context, cls, filter);
162    }
163
164    /**
165     * Retrieves a unique object with the given Filter string
166     * @param context The Sailpoint Context
167     * @param cls The class to query for
168     * @param filter a Filter object
169     * @param <T> The type of the object to query
170     * @return The queried object
171     * @throws GeneralException if too many or too few objects are found
172     */
173    public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, Filter filter) throws GeneralException {
174        QueryOptions qo = new QueryOptions();
175        qo.add(filter);
176        return getUniqueObject(context, cls, qo);
177    }
178
179    /**
180     * Retrieves a unique object with the given Filter string
181     * @param context The Sailpoint Context
182     * @param cls The class to query for
183     * @param queryOptions a QueryOptions object which will be cloned before querying
184     * @param <T> The type of the object to query
185     * @return The queried object
186     * @throws ObjectNotFoundException if no matches are found
187     * @throws GeneralException if too many or too few objects are found
188     */
189    public static <T extends SailPointObject> T getUniqueObject(SailPointContext context, Class<T> cls, QueryOptions queryOptions) throws ObjectNotFoundException, GeneralException {
190        QueryOptions qo = new QueryOptions();
191        qo.setFilters(queryOptions.getFilters());
192        qo.setCacheResults(queryOptions.isCacheResults());
193        qo.setDirtyRead(queryOptions.isDirtyRead());
194        qo.setDistinct(queryOptions.isDistinct());
195        qo.setFlushBeforeQuery(queryOptions.isFlushBeforeQuery());
196        qo.setGroupBys(queryOptions.getGroupBys());
197        qo.setOrderings(queryOptions.getOrderings());
198        qo.setScopeResults(queryOptions.getScopeResults());
199        qo.setTransactionLock(queryOptions.isTransactionLock());
200        qo.setUnscopedGloballyAccessible(queryOptions.getUnscopedGloballyAccessible());
201
202        qo.setResultLimit(2);
203
204        List<T> results = context.getObjects(cls, qo);
205        if (results == null || results.isEmpty()) {
206            throw new ObjectNotFoundException();
207        } else if (results.size() > 1) {
208            throw new GeneralException("Expected a unique result, got " + results.size() + " results");
209        }
210        return results.get(0);
211    }
212
213    /**
214     * Set up the given parameters in the prepared statmeent
215     * @param stmt The statement
216     * @param parameters The parameters
217     * @throws SQLException if any failures occur setting parameters
218     */
219    public static void setupParameters(PreparedStatement stmt, Object[] parameters) throws SQLException {
220        Parameters.setupParameters(stmt, parameters);
221    }
222
223    /**
224     * Logger
225     */
226    private final Log logger;
227    /**
228     * Connection to SailPoint
229     */
230    private final SailPointContext sailPointContext;
231
232    /**
233     * Constructor
234     * @param Log logger
235     * @throws GeneralException if there is an error getting the current thread's SailPoint context
236     */
237    public QueryUtil(@SuppressWarnings("unused") Log Log) throws GeneralException {
238        this(SailPointFactory.getCurrentContext(), Log);
239    }
240
241    /**
242     * Constructor
243     * @param context The SailPoint context
244     * @param log The logger
245     */
246    public QueryUtil(SailPointContext context, Log log) {
247        this.sailPointContext = context;
248        this.logger = log;
249    }
250
251    /**
252     * @param query The query to execute
253     * @param processor The class to call for every iteration of the loop
254     * @param parameters The parameters for the query, if any
255     * @return A list of results output by the ResultProcessor
256     * @throws GeneralException on any Sailpoint failures
257     * @throws SQLException on any database failures
258     */
259    public T getResult(String query, ResultProcessor<? extends T> processor, Object... parameters) throws GeneralException, SQLException {
260        logger.debug("Query = " + query);
261        T output = null;
262        try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) {
263            try (PreparedStatement stmt = conn.prepareStatement(query)) {
264                setupParameters(stmt, parameters);
265                try (ResultSet results = stmt.executeQuery()) {
266                    if (results.next()) {
267                        output = processor.processResult(results);
268                    } else {
269                        processor.noResult();
270                    }
271                }
272            }
273        }
274        return output;
275    }
276
277    /**
278     * @param query The query to execute
279     * @param processor The class to call for every iteration of the loop
280     * @param parameters The parameters for the query, if any
281     * @return A list of results output by the ResultProcessor
282     * @throws GeneralException on any Sailpoint failures
283     * @throws SQLException on any database failures
284     */
285    public List<T> getResults(String query, ResultProcessor<? extends T> processor, Object... parameters) throws GeneralException, SQLException {
286        logger.debug("Query = " + query);
287        List<T> output = new ArrayList<>();
288        try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) {
289            try (PreparedStatement stmt = conn.prepareStatement(query)) {
290                setupParameters(stmt, parameters);
291                try (ResultSet results = stmt.executeQuery()) {
292                    boolean hasResults = false;
293                    while (results.next()) {
294                        hasResults = true;
295                        output.add(processor.processResult(results));
296                    }
297                    if (!hasResults) {
298                        processor.noResult();
299                    }
300                }
301            }
302        }
303        return output;
304    }
305
306    /**
307     * Iterates a query by invoking a Beanshell callback for each method
308     * @param inputs The inputs
309     * @throws GeneralException if anything goes wrong
310     */
311    public void iterateQuery(IterateQueryOptions inputs) throws GeneralException {
312        try (Connection connection = inputs.openConnection()) {
313            try (NamedParameterStatement stmt = new NamedParameterStatement(connection, inputs.getQuery())) {
314                if (!Util.isEmpty(inputs.getQueryParams())) {
315                    stmt.setParameters(inputs.getQueryParams());
316                }
317
318                try (ResultSet results = stmt.executeQuery()) {
319                    ResultSetMetaData rsmd = results.getMetaData();
320                    List<String> columns = new ArrayList<>();
321                    for(int c = 1; c <= rsmd.getColumnCount(); c++) {
322                        columns.add(rsmd.getColumnLabel(c));
323                    }
324                    ResultSetIterator rsi = new ResultSetIterator(results, columns, sailPointContext);
325                    while(rsi.hasNext()) {
326                        Map<String, Object> row = rsi.nextRow();
327                        inputs.doCallback(row);
328                    }
329                }
330            }
331        } catch(SQLException e) {
332            throw new GeneralException(e);
333        }
334    }
335
336    /**
337     * Iterates over a query in parallel, making a call to the defined callback
338     * in the input options. (NOTE: Beanshell is explicitly thread-safe, but you
339     * should use the thread context provided and you should not access shared
340     * resources without doing your own thread-safety stuff.)
341     *
342     * @param inputs The input options
343     * @param threads The number of threads to use
344     * @throws GeneralException if anything fails
345     */
346    public PooledWorkerResults<Map<String, Object>> parallelIterateQuery(IterateQueryOptions inputs, int threads) throws GeneralException {
347        PooledWorkerResults<Map<String, Object>> resultContainer = new PooledWorkerResults<>();
348
349        ExecutorService executor = Executors.newWorkStealingPool(threads);
350        logger.info("Starting worker pool with " + threads + " threads");
351
352        try (Connection connection = inputs.openConnection()) {
353            try (NamedParameterStatement stmt = new NamedParameterStatement(connection, inputs.getQuery())) {
354                if (!Util.isEmpty(inputs.getQueryParams())) {
355                    stmt.setParameters(inputs.getQueryParams());
356                }
357
358                try (ResultSet results = stmt.executeQuery()) {
359                    ResultSetMetaData rsmd = results.getMetaData();
360                    List<String> columns = new ArrayList<>();
361                    for(int c = 1; c <= rsmd.getColumnCount(); c++) {
362                        columns.add(rsmd.getColumnLabel(c));
363                    }
364                    ResultSetIterator rsi = new ResultSetIterator(results, columns, sailPointContext);
365                    while(rsi.hasNext() && Thread.currentThread().isInterrupted()) {
366                        final Map<String, Object> row = new HashMap<>(rsi.nextRow());
367
368                        SailPointWorker worker = setupWorker(inputs, resultContainer, row);
369
370                        executor.submit(worker.runnable());
371                    }
372
373                    if (Thread.currentThread().isInterrupted()) {
374                        executor.shutdownNow();
375                        resultContainer.setInterrupted(true);
376                    } else {
377                        executor.shutdown();
378                        while (!executor.isTerminated()) {
379                            boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS);
380                            if (!terminated) {
381                                logger.debug("Waiting for thread pool to complete...");
382                            }
383                        }
384                    }
385                }
386            }
387        } catch(SQLException | InterruptedException e) {
388            resultContainer.setInterrupted(true);
389            if (!executor.isTerminated()) {
390                executor.shutdownNow();
391            }
392            throw new GeneralException(e);
393        }
394
395        return resultContainer;
396    }
397
398    /**
399     * Sets up the parallel SailPointWorker that will invoke the callback and do
400     * appropriate error handling.
401     *
402     * @param inputs The original query inputs
403     * @param resultContainer The results container for reporting results
404     * @param row The output row
405     * @return if any failures occur
406     */
407    private SailPointWorker setupWorker(IterateQueryOptions inputs, PooledWorkerResults<Map<String, Object>> resultContainer, Map<String, Object> row) {
408        SailPointWorker.ExceptionHandler exceptionHandler = t -> {
409            logger.error("Caught an error processing result row", t);
410            if (t instanceof Exception) {
411                resultContainer.addFailure(new Failure<>(row, (Exception) t));
412            }
413        };
414
415        SailPointWorker worker = new SailPointWorker() {
416            @Override
417            public Object execute(SailPointContext context, Log logger) throws Exception {
418                inputs.doParallelCallback(context, row);
419                return null;
420            }
421        };
422
423        worker.setExceptionHandler(exceptionHandler);
424        worker.setCompletedCounter(resultContainer.getCompleted());
425        worker.setFailedCounter(resultContainer.getFailed());
426        return worker;
427    }
428
429    /**
430     * Run an update statement against the database directly (use sparingly)
431     *
432     * @param query The query to execute
433     * @param parameters The parameters to include in the query
434     * @return The return from executeUpdate()
435     * @throws Exception On failures
436     */
437    public int update(String query, Object... parameters) throws Exception {
438        logger.debug("Query = " + query);
439        try (Connection conn = ContextConnectionWrapper.getConnection(sailPointContext)) {
440            try (PreparedStatement stmt = conn.prepareStatement(query)) {
441                setupParameters(stmt, parameters);
442                return stmt.executeUpdate();
443            }
444        }
445    }
446}