001package com.identityworksllc.iiq.common.task.export; 002 003import com.identityworksllc.iiq.common.Utilities; 004import com.identityworksllc.iiq.common.request.SailPointWorkerExecutor; 005import com.identityworksllc.iiq.common.threads.SailPointWorker; 006import org.apache.commons.logging.Log; 007import org.apache.commons.logging.LogFactory; 008import sailpoint.api.SailPointContext; 009import sailpoint.object.*; 010import sailpoint.task.AbstractTaskExecutor; 011import sailpoint.tools.GeneralException; 012import sailpoint.tools.JdbcUtil; 013import sailpoint.tools.Message; 014import sailpoint.tools.Util; 015 016import java.sql.Connection; 017import java.sql.PreparedStatement; 018import java.sql.ResultSet; 019import java.sql.SQLException; 020import java.time.Instant; 021import java.util.*; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024/** 025 * A partitioned task for handling data exports. The task can be provided multiple filters 026 * that should cover the entire set of desired export users. 027 * 028 * The partitions will run in three phases: the actual export, then a cleanup of any Links 029 * no longer in IIQ, then a finalization step that sets the last run date. 030 */ 031public class IDWDataExporter extends AbstractTaskExecutor { 032 // TODO 033 public static final List<String> SUFFIXES_16 = 034 Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"); 035 036 // TODO 037 public static final List<String> SUFFIXES_256 = 038 Arrays.asList( 039 "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f", "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f", "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af", "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf", "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf", "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df", "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef", "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff" 040 ); 041 042 // TODO 043 public static final int VERSION = 1; 044 045 private final Log logger; 046 private final AtomicBoolean stopped; 047 048 public IDWDataExporter() { 049 this.stopped = new AtomicBoolean(); 050 this.logger = LogFactory.getLog(IDWDataExporter.class); 051 } 052 /** 053 * @see sailpoint.object.TaskExecutor#execute(SailPointContext, TaskSchedule, TaskResult, Attributes) 054 */ 055 @Override 056 public void execute(SailPointContext context, TaskSchedule taskSchedule, TaskResult taskResult, Attributes<String, Object> attributes) throws Exception { 057 String requestDefinitionName = attributes.getString("requestDefinitionName"); 058 if (Util.isNullOrEmpty(requestDefinitionName)) { 059 requestDefinitionName = SailPointWorkerExecutor.REQUEST_DEFINITION; 060 } 061 RequestDefinition requestDefinition = context.getObjectByName(RequestDefinition.class, requestDefinitionName); 062 if (requestDefinition == null) { 063 throw new IllegalArgumentException("Request definition called " + requestDefinitionName + " does not exist; do you need to import it?"); 064 } 065 066 taskResult.addMessage("Partitions will execute using the Request Definition called " + requestDefinitionName); 067 068 List<ExportPartition> clusters = getPartitions(context, taskResult, attributes); 069 070 List<Request> partitionRequests = new ArrayList<>(); 071 for(ExportPartition partition : clusters) { 072 List<SailPointWorker> workerCluster = new ArrayList<>(); 073 workerCluster.add(partition); 074 // Serializes the SailPointWorker object so that it can be persisted 075 Request partitionRequest = SailPointWorker.toRequest(requestDefinition, workerCluster); 076 partitionRequest.setName(partition.getName()); 077 partitionRequests.add(partitionRequest); 078 } 079 080 taskResult.addMessage(Message.info("Launching " + partitionRequests.size() + " partitions")); 081 082 launchPartitions(context, taskResult, partitionRequests); 083 } 084 085 /** 086 * Gets the list of partitions for the export operation. These will each have their 'phase' 087 * attribute set so that they run in order. 088 * 089 * @param context The context 090 * @param taskResult The task result for the parent task 091 * @param attributes The attributes of the task execution 092 * @return The resulting list of partitions to launch 093 * @throws GeneralException if any failures occur 094 */ 095 public List<ExportPartition> getPartitions(SailPointContext context, TaskResult taskResult, Attributes<String, Object> attributes) throws GeneralException { 096 long now = System.currentTimeMillis(); 097 098 List<ExportPartition> partitions = new ArrayList<>(); 099 String configurationName = attributes.getString("configurationName"); 100 if (Util.isNullOrEmpty(configurationName)) { 101 throw new GeneralException("A configurationName setting is required for the data export job"); 102 } 103 104 int linkBatchSize = Util.otoi(attributes.get("linkBatchSize")); 105 106 List<String> identityFilters = attributes.getStringList("identityFilters"); 107 List<String> linkFilters = attributes.getStringList("linkFilters"); 108 List<String> linkFilters2 = attributes.getStringList("linkFilters2"); 109 110 // Everything in one giant partition by default 111 if (identityFilters == null || identityFilters.isEmpty()) { 112 identityFilters = new ArrayList<>(); 113 identityFilters.add("id.notNull()"); 114 } 115 116 // Everything in one giant partition by default 117 if (linkFilters == null || linkFilters.isEmpty()) { 118 linkFilters = new ArrayList<>(); 119 linkFilters.add("id.notNull()"); 120 } 121 122 if (linkFilters2 == null || linkFilters2.isEmpty()) { 123 linkFilters2 = new ArrayList<>(); 124 linkFilters2.add(null); 125 } 126 127 boolean doLinkCleanup = attributes.getBoolean("linkCleanup", true); 128 129 List<String> gatherStatsTables = attributes.getStringList("gatherStatsTables"); 130 131 long networkTimeout = Util.otolo(attributes.getString("networkTimeout")); 132 133 String driver = attributes.getString("driver"); 134 String url = attributes.getString("url"); 135 String username = attributes.getString("username"); 136 String password = attributes.getString("password"); 137 138 ExportConnectionInfo connectionInfo = new ExportConnectionInfo(url, username, password); 139 connectionInfo.setDriver(driver); 140 connectionInfo.setNetworkTimeout(networkTimeout); 141 142 if (attributes.get("connectionProperties") instanceof Map) { 143 Map<String, Object> props = Util.otom(attributes.get("connectionProperties")); 144 for(String key : props.keySet()) { 145 Object val = props.get(key); 146 if (val != null) { 147 connectionInfo.getOptions().setProperty(key, Util.otoa(val)); 148 } 149 } 150 } 151 152 String configHash = String.valueOf(Objects.hash(doLinkCleanup, linkFilters, linkFilters2, identityFilters, configurationName, connectionInfo)); 153 154 Map<String, Long> cutoffDates = new HashMap<>(); 155 156 String taskName = taskResult.getDefinition().getName(); 157 158 boolean deleteEnabled = true; 159 boolean isOracle = false; 160 161 try (Connection connection = ExportPartition.openConnection(context, connectionInfo)) { 162 isOracle = JdbcUtil.isOracle(connection); 163 164 try (PreparedStatement existingRows = connection.prepareStatement("select count(*) as c from de_identity")) { 165 try (ResultSet resultSet = existingRows.executeQuery()) { 166 if (resultSet.next()) { 167 int count = resultSet.getInt(1); 168 if (count == 0) { 169 logger.warn("DE_IDENTITY is empty; for first run, deletes will be suppressed to avoid index hangs"); 170 deleteEnabled = false; 171 doLinkCleanup = false; 172 } 173 } 174 } 175 } 176 try (PreparedStatement statement = connection.prepareStatement("select last_start_time, run_key, config_hash from de_runs where task_name = ? order by last_start_time desc")) { 177 statement.setString(1, taskName); 178 179 try (ResultSet results = statement.executeQuery()) { 180 while (results.next()) { 181 String key = results.getString("run_key"); 182 String configHashString = results.getString("config_hash"); 183 long lastStartTime = results.getLong("last_start_time"); 184 185 if (Util.nullSafeEq(configHashString, configHash)) { 186 cutoffDates.put(key, lastStartTime); 187 } else { 188 logger.warn("For export partition " + key + ": new config hash = " + configHash + ", old config hash = " + configHashString); 189 taskResult.addMessage(Message.warn("Configuration has changed after last run of partition " + key + "; forcing a full export")); 190 cutoffDates.put(key, 0L); 191 } 192 193 logger.info("For export partition " + key + ": threshold timestamp = " + Instant.ofEpochMilli(cutoffDates.get(key))); 194 } 195 } 196 } 197 } catch(SQLException e) { 198 throw new GeneralException(e); 199 } 200 201 taskResult.setAttribute("cutoffDates", cutoffDates); 202 203 int count = 1; 204 for(String filter : Util.safeIterable(identityFilters)) { 205 String lookup = "identity:" + filter; 206 207 Long cutoffDate = cutoffDates.get(lookup); 208 if (cutoffDate == null) { 209 logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0)); 210 cutoffDate = 0L; 211 } 212 213 ExportIdentitiesPartition eip = new ExportIdentitiesPartition(); 214 eip.setName("Identity export partition " + count++); 215 eip.setPhase(1); 216 eip.setExportTimestamp(now); 217 eip.setCutoffDate(cutoffDate); 218 eip.setFilterString(filter); 219 eip.setConnectionInfo(connectionInfo); 220 eip.setConfigurationName(configurationName); 221 eip.setTaskName(taskName); 222 eip.setRunKey(lookup); 223 eip.setConfigHash(configHash); 224 eip.setDeleteEnabled(deleteEnabled); 225 226 partitions.add(eip); 227 } 228 229 int totalLinkPartitions = Utilities.safeSize(linkFilters) * Utilities.safeSize(linkFilters2); 230 int halfway = totalLinkPartitions / 2; 231 int linkPartitionPhase = 2; 232 233 count = 1; 234 for(String filter : Util.safeIterable(linkFilters)) { 235 Filter compiled1 = Filter.compile(filter); 236 for(String filter2 : Util.safeIterable(linkFilters2)) { 237 String lookup; 238 239 if (Util.isNullOrEmpty(filter2)) { 240 lookup = "link:" + compiled1.getExpression(false); 241 } else { 242 lookup = "link:" + Filter.and(compiled1, Filter.compile(filter2)).getExpression(false); 243 } 244 Long cutoffDate = cutoffDates.get(lookup); 245 if (cutoffDate == null) { 246 logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0)); 247 cutoffDate = 0L; 248 } 249 250 ExportLinksPartition elp = new ExportLinksPartition(); 251 elp.setName("Link export partition " + count++); 252 elp.setPhase(linkPartitionPhase); 253 elp.setDependentPhase(linkPartitionPhase - 1); 254 elp.setExportTimestamp(now); 255 elp.setCutoffDate(cutoffDate); 256 elp.setFilterString(filter); 257 elp.setFilterString2(filter2); 258 elp.setConnectionInfo(connectionInfo); 259 elp.setConfigurationName(configurationName); 260 elp.setTaskName(taskName); 261 elp.setRunKey(lookup); 262 elp.setConfigHash(configHash); 263 elp.setDeleteEnabled(deleteEnabled); 264 265 if (linkBatchSize > 0) { 266 elp.setBatchSize(linkBatchSize); 267 } 268 269 partitions.add(elp); 270 271 if (count == halfway && isOracle && !Util.isEmpty(gatherStatsTables)) { 272 taskResult.addMessage("Adding a 'gather table stats' partition after link partition " + halfway); 273 OracleGatherStatsPartition statsPartition = new OracleGatherStatsPartition(gatherStatsTables); 274 // Phase 3 275 statsPartition.setPhase(linkPartitionPhase + 1); 276 statsPartition.setDependentPhase(linkPartitionPhase); 277 statsPartition.setName("Gather table stats: PARTIAL"); 278 partitions.add(statsPartition); 279 280 // Now phase 4, after the stats partition 281 linkPartitionPhase += 2; 282 } 283 } 284 } 285 286 if (doLinkCleanup) { 287 CleanupLinksPartition clp = new CleanupLinksPartition(); 288 // Either 3 or 5, depending on stats 289 clp.setPhase(linkPartitionPhase + 1); 290 clp.setDependentPhase(linkPartitionPhase); 291 clp.setName("Clean up deleted Links"); 292 clp.setConnectionInfo(connectionInfo); 293 clp.setRunKey("cleanup"); 294 clp.setTaskName(taskName); 295 clp.setConfigHash(configHash); 296 partitions.add(clp); 297 } 298 299 if (isOracle && !Util.isEmpty(gatherStatsTables)) { 300 OracleGatherStatsPartition statsPartition = new OracleGatherStatsPartition(gatherStatsTables); 301 statsPartition.setPhase(linkPartitionPhase + 1); 302 statsPartition.setDependentPhase(linkPartitionPhase); 303 statsPartition.setName("Gather table stats: FINAL"); 304 partitions.add(statsPartition); 305 } 306 307 return partitions; 308 } 309 310 @Override 311 public boolean terminate() { 312 this.stopped.set(true); 313 return true; 314 } 315}