001package com.identityworksllc.iiq.common.task.export;
002
003import com.identityworksllc.iiq.common.Functions;
004import com.identityworksllc.iiq.common.query.NamedParameterStatement;
005import com.identityworksllc.iiq.common.threads.SailPointWorker;
006import lombok.Getter;
007import lombok.Setter;
008import org.apache.commons.logging.Log;
009import org.apache.commons.logging.LogFactory;
010import sailpoint.api.SailPointContext;
011import sailpoint.object.Configuration;
012import sailpoint.object.TaskResult;
013import sailpoint.request.RequestPermanentException;
014import sailpoint.task.TaskMonitor;
015import sailpoint.tools.GeneralException;
016import sailpoint.tools.JdbcUtil;
017import sailpoint.tools.Message;
018import sailpoint.tools.Util;
019
020import java.io.Serializable;
021import java.sql.Connection;
022import java.sql.PreparedStatement;
023import java.sql.SQLException;
024import java.sql.Types;
025import java.time.Instant;
026import java.util.Date;
027import java.util.Objects;
028import java.util.StringJoiner;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031
032/**
033 * An abstract superclass for all export partitions. It will open a connection to the target
034 * database and then invoke export() on the subclass.
035 */
036@Setter
037@Getter
038public abstract class ExportPartition extends SailPointWorker implements Serializable {
039
040    private int batchSize = 50;
041    /**
042     * The config hash that will be recorded with de_runs when the partition is finished
043     */
044    private String configHash;
045    /**
046     * The configuration loaded in {@link #execute(SailPointContext, Log)}
047     */
048    protected transient Configuration configuration;
049    /**
050     * The name of the configuration object, set by the task
051     */
052    private String configurationName;
053    /**
054     * The connection info
055     */
056    private ExportConnectionInfo connectionInfo;
057
058    /**
059     * True if we ought to do a delete
060     */
061    private boolean deleteEnabled;
062
063    /**
064     * The cutoff date, milliseconds. We should not export records older than this date.
065     */
066    protected long cutoffDate;
067    /**
068     * The export timestamp in epoch milliseconds. We should not export records newer than this date.
069     */
070    protected long exportTimestamp;
071    /**
072     * The filter string
073     */
074    protected String filterString;
075    /**
076     * The second filter string if any
077     */
078    protected String filterString2;
079    /**
080     * The logger
081     */
082    private final Log logger;
083    /**
084     * The name of the partition
085     */
086    private String name;
087    /**
088     * The run key, which will be stored in the 'de_runs' table
089     */
090    private String runKey;
091
092    /**
093     * The name of the task, used to store the last run date
094     */
095    private String taskName;
096
097    /**
098     * Constructs a new ExportPartition
099     */
100    protected ExportPartition() {
101        this.logger = LogFactory.getLog(ExportPartition.class);
102    }
103
104    /**
105     * Adds the common date fields (which must have the given names) to the given prepared statement.
106     * The modified and last refresh dates can be null.
107     *
108     * @param statement The named parameter statement
109     * @param exportDate The export date
110     * @param created The creation date
111     * @param modified The modified date (which can be null)
112     * @param lastRefresh The last refresh date (which can be null)
113     * @throws SQLException if any failures occur (unlikely)
114     */
115    protected static void addCommonDateFields(NamedParameterStatement statement, Date exportDate, Date created, Date modified, Date lastRefresh) throws SQLException {
116        statement.setDate("created", created);
117        if (modified != null) {
118            statement.setDate("modified", modified);
119        } else {
120            statement.setNull("modified", Types.DATE);
121        }
122        if (lastRefresh != null) {
123            statement.setDate("lastRefresh", lastRefresh);
124        } else {
125            statement.setNull("lastRefresh", Types.DATE);
126        }
127        statement.setDate("now", exportDate);
128    }
129
130    /**
131     * Opens the connection to the target database using the provided connection info
132     * @param context The sailpoint context, used to decrypt the password
133     * @param connectionInfo The provided connection info, extracted from the export task def
134     * @return The open connection
135     * @throws GeneralException if any failures occur
136     */
137    public static Connection openConnection(SailPointContext context, ExportConnectionInfo connectionInfo) throws GeneralException {
138        String decryptedPassword = context.decrypt(connectionInfo.getEncryptedPassword());
139        return JdbcUtil.getConnection(connectionInfo.getDriver(), null, connectionInfo.getUrl(), connectionInfo.getUsername(), decryptedPassword, connectionInfo.getOptions());
140    }
141
142    /**
143     * The worker entrypoint
144     *
145     * @param context The private context to use for this thread worker
146     * @param _logger The log attached to this Worker
147     * @return Always null, nothing required here
148     * @throws Exception if anything goes wrong
149     */
150    @Override
151    public final Object execute(SailPointContext context, Log _logger) throws Exception {
152        if (Util.isNullOrEmpty(configurationName)) {
153            throw new GeneralException("Unable to execute export worker: configurationName not set");
154        }
155        this.configuration = context.getObjectByName(Configuration.class, configurationName);
156        if (this.configuration == null) {
157            throw new GeneralException("Unable to execute export worker: Configuration named '" + configurationName + "' does not exist");
158        }
159
160        if (Util.isNotNullOrEmpty(filterString)) {
161            TaskResult partitionResult = monitor.lockPartitionResult();
162            try {
163                partitionResult.setAttribute("filter", filterString + (filterString2 != null ? (" && " + filterString2) : ""));
164            } finally {
165                monitor.commitPartitionResult();
166            }
167        }
168
169        logger.info("Starting export partition with key " + runKey);
170
171        ExecutorService executorService = Executors.newFixedThreadPool(1);
172
173        try (Connection connection = openConnection(context, connectionInfo)) {
174            if (JdbcUtil.isMySQL(connection)) {
175                connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
176            }
177            if (connectionInfo.getNetworkTimeout() > 0) {
178                connection.setNetworkTimeout(executorService, connection.getNetworkTimeout());
179            }
180
181            boolean previousAutoCommit = connection.getAutoCommit();
182            connection.setAutoCommit(false);
183            try {
184                export(context, connection, logger);
185                connection.commit();
186
187                logger.info("Finished export partition " + runKey);
188
189                try (PreparedStatement deleteRun = connection.prepareStatement("DELETE FROM de_runs WHERE run_key = ? and task_name = ?"); PreparedStatement insertRun = connection.prepareStatement("INSERT INTO de_runs (last_start_time, run_key, task_name, config_hash) VALUES (?, ?, ?, ?)")) {
190                    deleteRun.setString(1, runKey);
191                    deleteRun.setString(2, taskName);
192
193                    insertRun.setLong(1, exportTimestamp);
194                    insertRun.setString(2, runKey);
195                    insertRun.setString(3, taskName);
196                    insertRun.setString(4, configHash);
197
198                    deleteRun.executeUpdate();
199                    insertRun.executeUpdate();
200                }
201
202                connection.commit();
203            } finally {
204                connection.setAutoCommit(previousAutoCommit);
205                executorService.shutdownNow();
206            }
207
208        } catch(Exception e) {
209            logger.error("Caught an error in partition of type " + this.getClass().getSimpleName() + " with key " + runKey, e);
210            TaskResult partitionResult = monitor.lockPartitionResult();
211            try {
212                partitionResult.addException(e);
213            } finally {
214                monitor.commitPartitionResult();
215            }
216            throw new RequestPermanentException(e);
217        }
218        return null;
219    }
220
221    /**
222     * Exports the data required to the listed database
223     *
224     * @param context The context
225     * @param connection The connection to the target database
226     * @param logger The logger
227     * @throws GeneralException if any failures occur
228     */
229    protected abstract void export(SailPointContext context, Connection connection, Log logger) throws GeneralException;
230
231    public void setConnectionInfo(ExportConnectionInfo connectionInfo) {
232        this.connectionInfo = Objects.requireNonNull(connectionInfo, "connectionInfo");
233    }
234
235    @Override
236    public String toString() {
237        return new StringJoiner(", ", ExportPartition.class.getSimpleName() + "[", "]")
238                .add("cutoffDate=" + Instant.ofEpochMilli(cutoffDate))
239                .add("exportTimestamp=" + Instant.ofEpochMilli(exportTimestamp))
240                .add("filterString2='" + filterString2 + "'")
241                .add("filterString='" + filterString + "'")
242                .add("name='" + name + "'")
243                .add("runKey='" + runKey + "'")
244                .toString();
245    }
246}