001package com.identityworksllc.iiq.common.task.export;
002
003import com.identityworksllc.iiq.common.query.NamedParameterStatement;
004import com.identityworksllc.iiq.common.threads.SailPointWorker;
005import lombok.Getter;
006import lombok.Setter;
007import org.apache.commons.logging.Log;
008import org.apache.commons.logging.LogFactory;
009import sailpoint.api.SailPointContext;
010import sailpoint.object.Configuration;
011import sailpoint.object.TaskDefinition;
012import sailpoint.object.TaskResult;
013import sailpoint.request.RequestPermanentException;
014import sailpoint.task.TaskMonitor;
015import sailpoint.tools.GeneralException;
016import sailpoint.tools.JdbcUtil;
017import sailpoint.tools.Util;
018
019import java.io.Serializable;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.SQLException;
023import java.sql.Types;
024import java.time.Instant;
025import java.util.Date;
026import java.util.Objects;
027import java.util.StringJoiner;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030
031/**
032 * An abstract superclass for all export partitions. It will open a connection to the target
033 * database and then invoke export() on the subclass.
034 */
035@Setter
036@Getter
037public abstract class ExportPartition extends SailPointWorker implements Serializable {
038
039    private int batchSize = 50;
040    /**
041     * The config hash that will be recorded with de_runs when the partition is finished
042     */
043    private String configHash;
044    /**
045     * The configuration loaded in {@link #execute(SailPointContext, Log)}
046     */
047    protected transient Configuration configuration;
048    /**
049     * The name of the configuration object, set by the task
050     */
051    private String configurationName;
052    /**
053     * The connection info
054     */
055    private ExportConnectionInfo connectionInfo;
056
057    /**
058     * True if we ought to do a delete
059     */
060    private boolean deleteEnabled;
061
062    /**
063     * The cutoff date, milliseconds. We should not export records older than this date.
064     */
065    protected long cutoffDate;
066    /**
067     * The export timestamp in epoch milliseconds. We should not export records newer than this date.
068     */
069    protected long exportTimestamp;
070    /**
071     * The filter string
072     */
073    protected String filterString;
074    /**
075     * The second filter string if any
076     */
077    protected String filterString2;
078    /**
079     * The logger
080     */
081    private final Log logger;
082    /**
083     * The name of the partition
084     */
085    private String name;
086    /**
087     * The run key, which will be stored in the 'de_runs' table
088     */
089    private String runKey;
090
091    /**
092     * The name of the task, used to store the last run date
093     */
094    private String taskName;
095
096    /**
097     * Constructs a new ExportPartition
098     */
099    protected ExportPartition() {
100        this.logger = LogFactory.getLog(ExportPartition.class);
101    }
102
103    /**
104     * Adds the common date fields (which must have the given names) to the given prepared statement.
105     * The modified and last refresh dates can be null.
106     *
107     * @param statement The named parameter statement
108     * @param exportDate The export date
109     * @param created The creation date
110     * @param modified The modified date (which can be null)
111     * @param lastRefresh The last refresh date (which can be null)
112     * @throws SQLException if any failures occur (unlikely)
113     */
114    protected static void addCommonDateFields(NamedParameterStatement statement, Date exportDate, Date created, Date modified, Date lastRefresh) throws SQLException {
115        statement.setDate("created", created);
116        if (modified != null) {
117            statement.setDate("modified", modified);
118        } else {
119            statement.setNull("modified", Types.DATE);
120        }
121        if (lastRefresh != null) {
122            statement.setDate("lastRefresh", lastRefresh);
123        } else {
124            statement.setNull("lastRefresh", Types.DATE);
125        }
126        statement.setDate("now", exportDate);
127    }
128
129    /**
130     * Opens the connection to the target database using the provided connection info
131     * @param context The sailpoint context, used to decrypt the password
132     * @param connectionInfo The provided connection info, extracted from the export task def
133     * @return The open connection
134     * @throws GeneralException if any failures occur
135     */
136    public static Connection openConnection(SailPointContext context, ExportConnectionInfo connectionInfo) throws GeneralException {
137        String decryptedPassword = context.decrypt(connectionInfo.getEncryptedPassword());
138        return JdbcUtil.getConnection(connectionInfo.getDriver(), null, connectionInfo.getUrl(), connectionInfo.getUsername(), decryptedPassword, connectionInfo.getOptions());
139    }
140
141    /**
142     * The worker entrypoint
143     *
144     * @param context The private context to use for this thread worker
145     * @param _logger The log attached to this Worker
146     * @return Always null, nothing required here
147     * @throws Exception if anything goes wrong
148     */
149    @Override
150    public final Object execute(SailPointContext context, Log _logger) throws Exception {
151        if (Util.isNullOrEmpty(configurationName)) {
152            throw new GeneralException("Unable to execute export worker: configurationName not set");
153        }
154        this.configuration = context.getObjectByName(Configuration.class, configurationName);
155        if (this.configuration == null) {
156            throw new GeneralException("Unable to execute export worker: Configuration named '" + configurationName + "' does not exist");
157        }
158
159        if (Util.isNotNullOrEmpty(filterString)) {
160            TaskResult partitionResult = monitor.lockPartitionResult();
161            try {
162                partitionResult.setAttribute("filter", filterString + (filterString2 != null ? (" && " + filterString2) : ""));
163            } finally {
164                monitor.commitPartitionResult();
165            }
166        }
167
168        logger.info("Starting export partition with key " + runKey);
169
170        ExecutorService executorService = Executors.newFixedThreadPool(1);
171
172        try (Connection connection = openConnection(context, connectionInfo)) {
173            if (JdbcUtil.isMySQL(connection)) {
174                connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
175            }
176            if (connectionInfo.getNetworkTimeout() > 0) {
177                connection.setNetworkTimeout(executorService, connection.getNetworkTimeout());
178            }
179
180            boolean previousAutoCommit = connection.getAutoCommit();
181            connection.setAutoCommit(false);
182            try {
183                export(context, connection, logger);
184                connection.commit();
185
186                logger.info("Finished export partition " + runKey);
187
188                try (PreparedStatement deleteRun = connection.prepareStatement("DELETE FROM de_runs WHERE run_key = ? and task_name = ?"); PreparedStatement insertRun = connection.prepareStatement("INSERT INTO de_runs (last_start_time, run_key, task_name, config_hash) VALUES (?, ?, ?, ?)")) {
189                    deleteRun.setString(1, runKey);
190                    deleteRun.setString(2, taskName);
191
192                    insertRun.setLong(1, exportTimestamp);
193                    insertRun.setString(2, runKey);
194                    insertRun.setString(3, taskName);
195                    insertRun.setString(4, configHash);
196
197                    deleteRun.executeUpdate();
198                    insertRun.executeUpdate();
199                }
200
201                connection.commit();
202            } finally {
203                connection.setAutoCommit(previousAutoCommit);
204                executorService.shutdownNow();
205            }
206
207        } catch(Exception e) {
208            logger.error("Caught an error in partition of type " + this.getClass().getSimpleName() + " with key " + runKey, e);
209            TaskResult partitionResult = monitor.lockPartitionResult();
210            try {
211                partitionResult.addException(e);
212            } finally {
213                monitor.commitPartitionResult();
214            }
215            throw new RequestPermanentException(e);
216        }
217        return null;
218    }
219
220    /**
221     * Exports the data required to the listed database
222     *
223     * @param context The context
224     * @param connection The connection to the target database
225     * @param logger The logger
226     * @throws GeneralException if any failures occur
227     */
228    public abstract void export(SailPointContext context, Connection connection, Log logger) throws GeneralException;
229
230    /**
231     * Sets the filter string
232     * @param connectionInfo The connection info
233     */
234    public void setConnectionInfo(ExportConnectionInfo connectionInfo) {
235        this.connectionInfo = Objects.requireNonNull(connectionInfo, "connectionInfo");
236    }
237
238    /**
239     * Tests the export of a single object. The subclass will determine which object
240     * type it is. This can be invoked via a Run Rule task, the Debug page, or the Rule
241     * Runner plugin.
242     *
243     * Each test will use a unique "run key" so that it doesn't interfere with the incremental
244     * export mechanism used by the main task.
245     *
246     * @param context The IIQ context
247     * @param logger The logger
248     * @param taskDefinitionName The name of the existing TaskDefinition
249     * @param id The ID of the object to export
250     * @throws Exception if any failures occur
251     */
252    public void testExportSingleObject(SailPointContext context, Log logger, String taskDefinitionName, String id) throws Exception {
253        TaskDefinition taskDefinition = context.getObjectByName(TaskDefinition.class, taskDefinitionName);
254        if (taskDefinition == null) {
255            throw new IllegalArgumentException("Task definition " + taskDefinitionName + " does not exist");
256        }
257
258        TaskResult taskResult = new TaskResult();
259        taskResult.setName("Test export single object " + Util.uuid());
260        taskResult.setDefinition(taskDefinition);
261
262        context.saveObject(taskResult);
263
264        TaskMonitor monitor = new TaskMonitor(context, taskResult);
265        setMonitor(monitor);
266
267        setFilterString("id == \"" + id + "\"");
268        setExportTimestamp(0L);
269        setCutoffDate(0L);
270        setRunKey("Test export " + taskResult.getId());
271
272        String url = taskDefinition.getString("url");
273        String username = taskDefinition.getString("username");
274        String password = taskDefinition.getString("password");
275
276        ExportConnectionInfo connectionInfo = new ExportConnectionInfo(url, username, password);
277        setConnectionInfo(connectionInfo);
278
279        String configurationName = taskDefinition.getString("configurationName");
280        setConfigurationName(configurationName);
281
282        this.execute(context, logger);
283    }
284
285    @Override
286    public String toString() {
287        return new StringJoiner(", ", ExportPartition.class.getSimpleName() + "[", "]")
288                .add("cutoffDate=" + Instant.ofEpochMilli(cutoffDate))
289                .add("exportTimestamp=" + Instant.ofEpochMilli(exportTimestamp))
290                .add("filterString2='" + filterString2 + "'")
291                .add("filterString='" + filterString + "'")
292                .add("name='" + name + "'")
293                .add("runKey='" + runKey + "'")
294                .toString();
295    }
296}