001package com.identityworksllc.iiq.common.task.export;
002
003import com.identityworksllc.iiq.common.Functions;
004import com.identityworksllc.iiq.common.query.NamedParameterStatement;
005import com.identityworksllc.iiq.common.threads.SailPointWorker;
006import lombok.Getter;
007import lombok.Setter;
008import org.apache.commons.logging.Log;
009import org.apache.commons.logging.LogFactory;
010import sailpoint.api.SailPointContext;
011import sailpoint.object.Configuration;
012import sailpoint.object.TaskResult;
013import sailpoint.request.RequestPermanentException;
014import sailpoint.task.TaskMonitor;
015import sailpoint.tools.GeneralException;
016import sailpoint.tools.JdbcUtil;
017import sailpoint.tools.Message;
018import sailpoint.tools.Util;
019
020import java.io.Serializable;
021import java.sql.Connection;
022import java.sql.PreparedStatement;
023import java.sql.SQLException;
024import java.sql.Types;
025import java.time.Instant;
026import java.util.Date;
027import java.util.Objects;
028import java.util.StringJoiner;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031
032/**
033 * An abstract superclass for all export partitions. It will open a connection to the target
034 * database and then invoke export() on the subclass.
035 */
036@Setter
037@Getter
038public abstract class ExportPartition extends SailPointWorker implements Serializable {
039
040    private int batchSize = 50;
041    /**
042     * The config hash that will be recorded with de_runs when the partition is finished
043     */
044    private String configHash;
045    /**
046     * The configuration loaded in {@link #execute(SailPointContext, Log)}
047     */
048    protected transient Configuration configuration;
049    /**
050     * The name of the configuration object, set by the task
051     */
052    private String configurationName;
053    /**
054     * The connection info
055     */
056    private ExportConnectionInfo connectionInfo;
057    /**
058     * The cutoff date, milliseconds. We should not export records older than this date.
059     */
060    protected long cutoffDate;
061    /**
062     * The export timestamp in epoch milliseconds. We should not export records newer than this date.
063     */
064    protected long exportTimestamp;
065    /**
066     * The filter string
067     */
068    protected String filterString;
069    /**
070     * The second filter string if any
071     */
072    protected String filterString2;
073    /**
074     * The logger
075     */
076    private final Log logger;
077    /**
078     * The name of the partition
079     */
080    private String name;
081    /**
082     * The run key, which will be stored in the 'de_runs' table
083     */
084    private String runKey;
085
086    /**
087     * The name of the task, used to store the last run date
088     */
089    private String taskName;
090
091    /**
092     * Constructs a new ExportPartition
093     */
094    protected ExportPartition() {
095        this.logger = LogFactory.getLog(ExportPartition.class);
096    }
097
098    /**
099     * Adds the common date fields (which must have the given names) to the given prepared statement.
100     * The modified and last refresh dates can be null.
101     *
102     * @param statement The named parameter statement
103     * @param exportDate The export date
104     * @param created The creation date
105     * @param modified The modified date (which can be null)
106     * @param lastRefresh The last refresh date (which can be null)
107     * @throws SQLException if any failures occur (unlikely)
108     */
109    protected static void addCommonDateFields(NamedParameterStatement statement, Date exportDate, Date created, Date modified, Date lastRefresh) throws SQLException {
110        statement.setDate("created", created);
111        if (modified != null) {
112            statement.setDate("modified", modified);
113        } else {
114            statement.setNull("modified", Types.DATE);
115        }
116        if (lastRefresh != null) {
117            statement.setDate("lastRefresh", lastRefresh);
118        } else {
119            statement.setNull("lastRefresh", Types.DATE);
120        }
121        statement.setDate("now", exportDate);
122    }
123
124    /**
125     * Opens the connection to the target database using the provided connection info
126     * @param context The sailpoint context, used to decrypt the password
127     * @param connectionInfo The provided connection info, extracted from the export task def
128     * @return The open connection
129     * @throws GeneralException if any failures occur
130     */
131    public static Connection openConnection(SailPointContext context, ExportConnectionInfo connectionInfo) throws GeneralException {
132        String decryptedPassword = context.decrypt(connectionInfo.getEncryptedPassword());
133        return JdbcUtil.getConnection(connectionInfo.getDriver(), null, connectionInfo.getUrl(), connectionInfo.getUsername(), decryptedPassword, connectionInfo.getOptions());
134    }
135
136    /**
137     * The worker entrypoint
138     *
139     * @param context The private context to use for this thread worker
140     * @param _logger The log attached to this Worker
141     * @return Always null, nothing required here
142     * @throws Exception if anything goes wrong
143     */
144    @Override
145    public final Object execute(SailPointContext context, Log _logger) throws Exception {
146        if (Util.isNullOrEmpty(configurationName)) {
147            throw new GeneralException("Unable to execute export worker: configurationName not set");
148        }
149        this.configuration = context.getObjectByName(Configuration.class, configurationName);
150        if (this.configuration == null) {
151            throw new GeneralException("Unable to execute export worker: Configuration named '" + configurationName + "' does not exist");
152        }
153
154        if (Util.isNotNullOrEmpty(filterString)) {
155            TaskResult partitionResult = monitor.lockPartitionResult();
156            try {
157                partitionResult.setAttribute("filter", filterString + (filterString2 != null ? (" && " + filterString2) : ""));
158            } finally {
159                monitor.commitPartitionResult();
160            }
161        }
162
163        logger.info("Starting export partition with key " + runKey);
164
165        ExecutorService executorService = Executors.newFixedThreadPool(1);
166
167        try (Connection connection = openConnection(context, connectionInfo)) {
168            if (JdbcUtil.isMySQL(connection)) {
169                connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
170            }
171            if (connectionInfo.getNetworkTimeout() > 0) {
172                connection.setNetworkTimeout(executorService, connection.getNetworkTimeout());
173            }
174
175            boolean previousAutoCommit = connection.getAutoCommit();
176            connection.setAutoCommit(false);
177            try {
178                export(context, connection, logger);
179                connection.commit();
180
181                logger.info("Finished export partition " + runKey);
182
183                try (PreparedStatement deleteRun = connection.prepareStatement("DELETE FROM de_runs WHERE run_key = ? and task_name = ?"); PreparedStatement insertRun = connection.prepareStatement("INSERT INTO de_runs (last_start_time, run_key, task_name, config_hash) VALUES (?, ?, ?, ?)")) {
184                    deleteRun.setString(1, runKey);
185                    deleteRun.setString(2, taskName);
186
187                    insertRun.setLong(1, exportTimestamp);
188                    insertRun.setString(2, runKey);
189                    insertRun.setString(3, taskName);
190                    insertRun.setString(4, configHash);
191
192                    deleteRun.executeUpdate();
193                    insertRun.executeUpdate();
194                }
195
196                connection.commit();
197            } finally {
198                connection.setAutoCommit(previousAutoCommit);
199                executorService.shutdownNow();
200            }
201
202        } catch(Exception e) {
203            logger.error("Caught an error in partition of type " + this.getClass().getSimpleName() + " with key " + runKey, e);
204            TaskResult partitionResult = monitor.lockPartitionResult();
205            try {
206                partitionResult.addException(e);
207            } finally {
208                monitor.commitPartitionResult();
209            }
210            throw new RequestPermanentException(e);
211        }
212        return null;
213    }
214
215    /**
216     * Exports the data required to the listed database
217     *
218     * @param context The context
219     * @param connection The connection to the target database
220     * @param logger The logger
221     * @throws GeneralException if any failures occur
222     */
223    protected abstract void export(SailPointContext context, Connection connection, Log logger) throws GeneralException;
224
225    public void setConnectionInfo(ExportConnectionInfo connectionInfo) {
226        this.connectionInfo = Objects.requireNonNull(connectionInfo, "connectionInfo");
227    }
228
229    @Override
230    public String toString() {
231        return new StringJoiner(", ", ExportPartition.class.getSimpleName() + "[", "]")
232                .add("cutoffDate=" + Instant.ofEpochMilli(cutoffDate))
233                .add("exportTimestamp=" + Instant.ofEpochMilli(exportTimestamp))
234                .add("filterString2='" + filterString2 + "'")
235                .add("filterString='" + filterString + "'")
236                .add("name='" + name + "'")
237                .add("runKey='" + runKey + "'")
238                .toString();
239    }
240}