001package com.identityworksllc.iiq.common.task.export; 002 003import com.identityworksllc.iiq.common.Functions; 004import com.identityworksllc.iiq.common.query.NamedParameterStatement; 005import com.identityworksllc.iiq.common.threads.SailPointWorker; 006import lombok.Getter; 007import lombok.Setter; 008import org.apache.commons.logging.Log; 009import org.apache.commons.logging.LogFactory; 010import sailpoint.api.SailPointContext; 011import sailpoint.object.Configuration; 012import sailpoint.object.TaskResult; 013import sailpoint.request.RequestPermanentException; 014import sailpoint.task.TaskMonitor; 015import sailpoint.tools.GeneralException; 016import sailpoint.tools.JdbcUtil; 017import sailpoint.tools.Message; 018import sailpoint.tools.Util; 019 020import java.io.Serializable; 021import java.sql.Connection; 022import java.sql.PreparedStatement; 023import java.sql.SQLException; 024import java.sql.Types; 025import java.time.Instant; 026import java.util.Date; 027import java.util.Objects; 028import java.util.StringJoiner; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031 032/** 033 * An abstract superclass for all export partitions. It will open a connection to the target 034 * database and then invoke export() on the subclass. 035 */ 036@Setter 037@Getter 038public abstract class ExportPartition extends SailPointWorker implements Serializable { 039 040 private int batchSize = 50; 041 /** 042 * The config hash that will be recorded with de_runs when the partition is finished 043 */ 044 private String configHash; 045 /** 046 * The configuration loaded in {@link #execute(SailPointContext, Log)} 047 */ 048 protected transient Configuration configuration; 049 /** 050 * The name of the configuration object, set by the task 051 */ 052 private String configurationName; 053 /** 054 * The connection info 055 */ 056 private ExportConnectionInfo connectionInfo; 057 /** 058 * The cutoff date, milliseconds. We should not export records older than this date. 059 */ 060 protected long cutoffDate; 061 /** 062 * The export timestamp in epoch milliseconds. We should not export records newer than this date. 063 */ 064 protected long exportTimestamp; 065 /** 066 * The filter string 067 */ 068 protected String filterString; 069 /** 070 * The second filter string if any 071 */ 072 protected String filterString2; 073 /** 074 * The logger 075 */ 076 private final Log logger; 077 /** 078 * The name of the partition 079 */ 080 private String name; 081 /** 082 * The run key, which will be stored in the 'de_runs' table 083 */ 084 private String runKey; 085 086 /** 087 * The name of the task, used to store the last run date 088 */ 089 private String taskName; 090 091 /** 092 * Constructs a new ExportPartition 093 */ 094 protected ExportPartition() { 095 this.logger = LogFactory.getLog(ExportPartition.class); 096 } 097 098 /** 099 * Adds the common date fields (which must have the given names) to the given prepared statement. 100 * The modified and last refresh dates can be null. 101 * 102 * @param statement The named parameter statement 103 * @param exportDate The export date 104 * @param created The creation date 105 * @param modified The modified date (which can be null) 106 * @param lastRefresh The last refresh date (which can be null) 107 * @throws SQLException if any failures occur (unlikely) 108 */ 109 protected static void addCommonDateFields(NamedParameterStatement statement, Date exportDate, Date created, Date modified, Date lastRefresh) throws SQLException { 110 statement.setDate("created", created); 111 if (modified != null) { 112 statement.setDate("modified", modified); 113 } else { 114 statement.setNull("modified", Types.DATE); 115 } 116 if (lastRefresh != null) { 117 statement.setDate("lastRefresh", lastRefresh); 118 } else { 119 statement.setNull("lastRefresh", Types.DATE); 120 } 121 statement.setDate("now", exportDate); 122 } 123 124 /** 125 * Opens the connection to the target database using the provided connection info 126 * @param context The sailpoint context, used to decrypt the password 127 * @param connectionInfo The provided connection info, extracted from the export task def 128 * @return The open connection 129 * @throws GeneralException if any failures occur 130 */ 131 public static Connection openConnection(SailPointContext context, ExportConnectionInfo connectionInfo) throws GeneralException { 132 String decryptedPassword = context.decrypt(connectionInfo.getEncryptedPassword()); 133 return JdbcUtil.getConnection(connectionInfo.getDriver(), null, connectionInfo.getUrl(), connectionInfo.getUsername(), decryptedPassword, connectionInfo.getOptions()); 134 } 135 136 /** 137 * The worker entrypoint 138 * 139 * @param context The private context to use for this thread worker 140 * @param _logger The log attached to this Worker 141 * @return Always null, nothing required here 142 * @throws Exception if anything goes wrong 143 */ 144 @Override 145 public final Object execute(SailPointContext context, Log _logger) throws Exception { 146 if (Util.isNullOrEmpty(configurationName)) { 147 throw new GeneralException("Unable to execute export worker: configurationName not set"); 148 } 149 this.configuration = context.getObjectByName(Configuration.class, configurationName); 150 if (this.configuration == null) { 151 throw new GeneralException("Unable to execute export worker: Configuration named '" + configurationName + "' does not exist"); 152 } 153 154 if (Util.isNotNullOrEmpty(filterString)) { 155 TaskResult partitionResult = monitor.lockPartitionResult(); 156 try { 157 partitionResult.setAttribute("filter", filterString + (filterString2 != null ? (" && " + filterString2) : "")); 158 } finally { 159 monitor.commitPartitionResult(); 160 } 161 } 162 163 logger.info("Starting export partition with key " + runKey); 164 165 ExecutorService executorService = Executors.newFixedThreadPool(1); 166 167 try (Connection connection = openConnection(context, connectionInfo)) { 168 if (JdbcUtil.isMySQL(connection)) { 169 connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); 170 } 171 if (connectionInfo.getNetworkTimeout() > 0) { 172 connection.setNetworkTimeout(executorService, connection.getNetworkTimeout()); 173 } 174 175 boolean previousAutoCommit = connection.getAutoCommit(); 176 connection.setAutoCommit(false); 177 try { 178 export(context, connection, logger); 179 connection.commit(); 180 181 logger.info("Finished export partition " + runKey); 182 183 try (PreparedStatement deleteRun = connection.prepareStatement("DELETE FROM de_runs WHERE run_key = ? and task_name = ?"); PreparedStatement insertRun = connection.prepareStatement("INSERT INTO de_runs (last_start_time, run_key, task_name, config_hash) VALUES (?, ?, ?, ?)")) { 184 deleteRun.setString(1, runKey); 185 deleteRun.setString(2, taskName); 186 187 insertRun.setLong(1, exportTimestamp); 188 insertRun.setString(2, runKey); 189 insertRun.setString(3, taskName); 190 insertRun.setString(4, configHash); 191 192 deleteRun.executeUpdate(); 193 insertRun.executeUpdate(); 194 } 195 196 connection.commit(); 197 } finally { 198 connection.setAutoCommit(previousAutoCommit); 199 executorService.shutdownNow(); 200 } 201 202 } catch(Exception e) { 203 logger.error("Caught an error in partition of type " + this.getClass().getSimpleName() + " with key " + runKey, e); 204 TaskResult partitionResult = monitor.lockPartitionResult(); 205 try { 206 partitionResult.addException(e); 207 } finally { 208 monitor.commitPartitionResult(); 209 } 210 throw new RequestPermanentException(e); 211 } 212 return null; 213 } 214 215 /** 216 * Exports the data required to the listed database 217 * 218 * @param context The context 219 * @param connection The connection to the target database 220 * @param logger The logger 221 * @throws GeneralException if any failures occur 222 */ 223 protected abstract void export(SailPointContext context, Connection connection, Log logger) throws GeneralException; 224 225 public void setConnectionInfo(ExportConnectionInfo connectionInfo) { 226 this.connectionInfo = Objects.requireNonNull(connectionInfo, "connectionInfo"); 227 } 228 229 @Override 230 public String toString() { 231 return new StringJoiner(", ", ExportPartition.class.getSimpleName() + "[", "]") 232 .add("cutoffDate=" + Instant.ofEpochMilli(cutoffDate)) 233 .add("exportTimestamp=" + Instant.ofEpochMilli(exportTimestamp)) 234 .add("filterString2='" + filterString2 + "'") 235 .add("filterString='" + filterString + "'") 236 .add("name='" + name + "'") 237 .add("runKey='" + runKey + "'") 238 .toString(); 239 } 240}