001package com.identityworksllc.iiq.common.task.export; 002 003import com.identityworksllc.iiq.common.Functions; 004import com.identityworksllc.iiq.common.query.NamedParameterStatement; 005import com.identityworksllc.iiq.common.threads.SailPointWorker; 006import lombok.Getter; 007import lombok.Setter; 008import org.apache.commons.logging.Log; 009import org.apache.commons.logging.LogFactory; 010import sailpoint.api.SailPointContext; 011import sailpoint.object.Configuration; 012import sailpoint.object.TaskResult; 013import sailpoint.request.RequestPermanentException; 014import sailpoint.task.TaskMonitor; 015import sailpoint.tools.GeneralException; 016import sailpoint.tools.JdbcUtil; 017import sailpoint.tools.Message; 018import sailpoint.tools.Util; 019 020import java.io.Serializable; 021import java.sql.Connection; 022import java.sql.PreparedStatement; 023import java.sql.SQLException; 024import java.sql.Types; 025import java.time.Instant; 026import java.util.Date; 027import java.util.Objects; 028import java.util.StringJoiner; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031 032/** 033 * An abstract superclass for all export partitions. It will open a connection to the target 034 * database and then invoke export() on the subclass. 035 */ 036@Setter 037@Getter 038public abstract class ExportPartition extends SailPointWorker implements Serializable { 039 040 private int batchSize = 50; 041 /** 042 * The config hash that will be recorded with de_runs when the partition is finished 043 */ 044 private String configHash; 045 /** 046 * The configuration loaded in {@link #execute(SailPointContext, Log)} 047 */ 048 protected transient Configuration configuration; 049 /** 050 * The name of the configuration object, set by the task 051 */ 052 private String configurationName; 053 /** 054 * The connection info 055 */ 056 private ExportConnectionInfo connectionInfo; 057 058 /** 059 * True if we ought to do a delete 060 */ 061 private boolean deleteEnabled; 062 063 /** 064 * The cutoff date, milliseconds. We should not export records older than this date. 065 */ 066 protected long cutoffDate; 067 /** 068 * The export timestamp in epoch milliseconds. We should not export records newer than this date. 069 */ 070 protected long exportTimestamp; 071 /** 072 * The filter string 073 */ 074 protected String filterString; 075 /** 076 * The second filter string if any 077 */ 078 protected String filterString2; 079 /** 080 * The logger 081 */ 082 private final Log logger; 083 /** 084 * The name of the partition 085 */ 086 private String name; 087 /** 088 * The run key, which will be stored in the 'de_runs' table 089 */ 090 private String runKey; 091 092 /** 093 * The name of the task, used to store the last run date 094 */ 095 private String taskName; 096 097 /** 098 * Constructs a new ExportPartition 099 */ 100 protected ExportPartition() { 101 this.logger = LogFactory.getLog(ExportPartition.class); 102 } 103 104 /** 105 * Adds the common date fields (which must have the given names) to the given prepared statement. 106 * The modified and last refresh dates can be null. 107 * 108 * @param statement The named parameter statement 109 * @param exportDate The export date 110 * @param created The creation date 111 * @param modified The modified date (which can be null) 112 * @param lastRefresh The last refresh date (which can be null) 113 * @throws SQLException if any failures occur (unlikely) 114 */ 115 protected static void addCommonDateFields(NamedParameterStatement statement, Date exportDate, Date created, Date modified, Date lastRefresh) throws SQLException { 116 statement.setDate("created", created); 117 if (modified != null) { 118 statement.setDate("modified", modified); 119 } else { 120 statement.setNull("modified", Types.DATE); 121 } 122 if (lastRefresh != null) { 123 statement.setDate("lastRefresh", lastRefresh); 124 } else { 125 statement.setNull("lastRefresh", Types.DATE); 126 } 127 statement.setDate("now", exportDate); 128 } 129 130 /** 131 * Opens the connection to the target database using the provided connection info 132 * @param context The sailpoint context, used to decrypt the password 133 * @param connectionInfo The provided connection info, extracted from the export task def 134 * @return The open connection 135 * @throws GeneralException if any failures occur 136 */ 137 public static Connection openConnection(SailPointContext context, ExportConnectionInfo connectionInfo) throws GeneralException { 138 String decryptedPassword = context.decrypt(connectionInfo.getEncryptedPassword()); 139 return JdbcUtil.getConnection(connectionInfo.getDriver(), null, connectionInfo.getUrl(), connectionInfo.getUsername(), decryptedPassword, connectionInfo.getOptions()); 140 } 141 142 /** 143 * The worker entrypoint 144 * 145 * @param context The private context to use for this thread worker 146 * @param _logger The log attached to this Worker 147 * @return Always null, nothing required here 148 * @throws Exception if anything goes wrong 149 */ 150 @Override 151 public final Object execute(SailPointContext context, Log _logger) throws Exception { 152 if (Util.isNullOrEmpty(configurationName)) { 153 throw new GeneralException("Unable to execute export worker: configurationName not set"); 154 } 155 this.configuration = context.getObjectByName(Configuration.class, configurationName); 156 if (this.configuration == null) { 157 throw new GeneralException("Unable to execute export worker: Configuration named '" + configurationName + "' does not exist"); 158 } 159 160 if (Util.isNotNullOrEmpty(filterString)) { 161 TaskResult partitionResult = monitor.lockPartitionResult(); 162 try { 163 partitionResult.setAttribute("filter", filterString + (filterString2 != null ? (" && " + filterString2) : "")); 164 } finally { 165 monitor.commitPartitionResult(); 166 } 167 } 168 169 logger.info("Starting export partition with key " + runKey); 170 171 ExecutorService executorService = Executors.newFixedThreadPool(1); 172 173 try (Connection connection = openConnection(context, connectionInfo)) { 174 if (JdbcUtil.isMySQL(connection)) { 175 connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); 176 } 177 if (connectionInfo.getNetworkTimeout() > 0) { 178 connection.setNetworkTimeout(executorService, connection.getNetworkTimeout()); 179 } 180 181 boolean previousAutoCommit = connection.getAutoCommit(); 182 connection.setAutoCommit(false); 183 try { 184 export(context, connection, logger); 185 connection.commit(); 186 187 logger.info("Finished export partition " + runKey); 188 189 try (PreparedStatement deleteRun = connection.prepareStatement("DELETE FROM de_runs WHERE run_key = ? and task_name = ?"); PreparedStatement insertRun = connection.prepareStatement("INSERT INTO de_runs (last_start_time, run_key, task_name, config_hash) VALUES (?, ?, ?, ?)")) { 190 deleteRun.setString(1, runKey); 191 deleteRun.setString(2, taskName); 192 193 insertRun.setLong(1, exportTimestamp); 194 insertRun.setString(2, runKey); 195 insertRun.setString(3, taskName); 196 insertRun.setString(4, configHash); 197 198 deleteRun.executeUpdate(); 199 insertRun.executeUpdate(); 200 } 201 202 connection.commit(); 203 } finally { 204 connection.setAutoCommit(previousAutoCommit); 205 executorService.shutdownNow(); 206 } 207 208 } catch(Exception e) { 209 logger.error("Caught an error in partition of type " + this.getClass().getSimpleName() + " with key " + runKey, e); 210 TaskResult partitionResult = monitor.lockPartitionResult(); 211 try { 212 partitionResult.addException(e); 213 } finally { 214 monitor.commitPartitionResult(); 215 } 216 throw new RequestPermanentException(e); 217 } 218 return null; 219 } 220 221 /** 222 * Exports the data required to the listed database 223 * 224 * @param context The context 225 * @param connection The connection to the target database 226 * @param logger The logger 227 * @throws GeneralException if any failures occur 228 */ 229 protected abstract void export(SailPointContext context, Connection connection, Log logger) throws GeneralException; 230 231 public void setConnectionInfo(ExportConnectionInfo connectionInfo) { 232 this.connectionInfo = Objects.requireNonNull(connectionInfo, "connectionInfo"); 233 } 234 235 @Override 236 public String toString() { 237 return new StringJoiner(", ", ExportPartition.class.getSimpleName() + "[", "]") 238 .add("cutoffDate=" + Instant.ofEpochMilli(cutoffDate)) 239 .add("exportTimestamp=" + Instant.ofEpochMilli(exportTimestamp)) 240 .add("filterString2='" + filterString2 + "'") 241 .add("filterString='" + filterString + "'") 242 .add("name='" + name + "'") 243 .add("runKey='" + runKey + "'") 244 .toString(); 245 } 246}