001package com.identityworksllc.iiq.common.task.export; 002 003import com.identityworksllc.iiq.common.request.SailPointWorkerExecutor; 004import com.identityworksllc.iiq.common.threads.SailPointWorker; 005import org.apache.commons.logging.Log; 006import org.apache.commons.logging.LogFactory; 007import sailpoint.api.SailPointContext; 008import sailpoint.object.*; 009import sailpoint.task.AbstractTaskExecutor; 010import sailpoint.tools.GeneralException; 011import sailpoint.tools.Message; 012import sailpoint.tools.Util; 013 014import java.sql.Connection; 015import java.sql.PreparedStatement; 016import java.sql.ResultSet; 017import java.sql.SQLException; 018import java.time.Instant; 019import java.util.*; 020import java.util.concurrent.atomic.AtomicBoolean; 021 022/** 023 * A partitioned task for handling data exports. The task can be provided multiple filters 024 * that should cover the entire set of desired export users. 025 * 026 * The partitions will run in three phases: the actual export, then a cleanup of any Links 027 * no longer in IIQ, then a finalization step that sets the last run date. 028 */ 029public class IDWDataExporter extends AbstractTaskExecutor { 030 // TODO 031 public static final List<String> suffixes16 = 032 Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"); 033 034 // TODO 035 public static final List<String> suffixes256 = 036 Arrays.asList( 037 "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f", "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f", "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af", "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf", "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf", "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df", "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef", "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff" 038 ); 039 040 // TODO 041 public static final int VERSION = 1; 042 043 private final Log logger; 044 private final AtomicBoolean stopped; 045 046 public IDWDataExporter() { 047 this.stopped = new AtomicBoolean(); 048 this.logger = LogFactory.getLog(IDWDataExporter.class); 049 } 050 /** 051 * @see sailpoint.object.TaskExecutor#execute(SailPointContext, TaskSchedule, TaskResult, Attributes) 052 */ 053 @Override 054 public void execute(SailPointContext context, TaskSchedule taskSchedule, TaskResult taskResult, Attributes<String, Object> attributes) throws Exception { 055 String requestDefinitionName = attributes.getString("requestDefinitionName"); 056 if (Util.isNullOrEmpty(requestDefinitionName)) { 057 requestDefinitionName = SailPointWorkerExecutor.REQUEST_DEFINITION; 058 } 059 RequestDefinition requestDefinition = context.getObjectByName(RequestDefinition.class, requestDefinitionName); 060 if (requestDefinition == null) { 061 throw new IllegalArgumentException("Request definition called " + requestDefinitionName + " does not exist; do you need to import it?"); 062 } 063 064 taskResult.addMessage("Partitions will execute using the Request Definition called " + requestDefinitionName); 065 066 List<ExportPartition> clusters = getPartitions(context, taskResult, attributes); 067 068 List<Request> partitionRequests = new ArrayList<>(); 069 for(ExportPartition partition : clusters) { 070 List<SailPointWorker> workerCluster = new ArrayList<>(); 071 workerCluster.add(partition); 072 // Serializes the SailPointWorker object so that it can be persisted 073 Request partitionRequest = SailPointWorker.toRequest(requestDefinition, workerCluster); 074 partitionRequest.setName(partition.getName()); 075 partitionRequests.add(partitionRequest); 076 } 077 078 taskResult.addMessage(Message.info("Launching " + partitionRequests.size() + " partitions")); 079 080 launchPartitions(context, taskResult, partitionRequests); 081 } 082 083 /** 084 * Gets the list of partitions for the export operation. These will each have their 'phase' 085 * attribute set so that they run in order. 086 * 087 * @param context The context 088 * @param taskResult The task result for the parent task 089 * @param attributes The attributes of the task execution 090 * @return The resulting list of partitions to launch 091 * @throws GeneralException if any failures occur 092 */ 093 public List<ExportPartition> getPartitions(SailPointContext context, TaskResult taskResult, Attributes<String, Object> attributes) throws GeneralException { 094 long now = System.currentTimeMillis(); 095 096 List<ExportPartition> partitions = new ArrayList<>(); 097 String configurationName = attributes.getString("configurationName"); 098 if (Util.isNullOrEmpty(configurationName)) { 099 throw new GeneralException("A configurationName setting is required for the data export job"); 100 } 101 102 int linkBatchSize = Util.otoi(attributes.get("linkBatchSize")); 103 104 List<String> identityFilters = attributes.getStringList("identityFilters"); 105 List<String> linkFilters = attributes.getStringList("linkFilters"); 106 List<String> linkFilters2 = attributes.getStringList("linkFilters2"); 107 108 // Everything in one giant partition by default 109 if (identityFilters == null || identityFilters.isEmpty()) { 110 identityFilters = new ArrayList<>(); 111 identityFilters.add("id.notNull()"); 112 } 113 114 // Everything in one giant partition by default 115 if (linkFilters == null || linkFilters.isEmpty()) { 116 linkFilters = new ArrayList<>(); 117 linkFilters.add("id.notNull()"); 118 } 119 120 if (linkFilters2 == null || linkFilters2.isEmpty()) { 121 linkFilters2 = new ArrayList<>(); 122 linkFilters2.add(null); 123 } 124 125 boolean doLinkCleanup = attributes.getBoolean("linkCleanup", true); 126 127 long networkTimeout = attributes.getLong("networkTimeout"); 128 129 String driver = attributes.getString("driver"); 130 String url = attributes.getString("url"); 131 String username = attributes.getString("username"); 132 String password = attributes.getString("password"); 133 134 ExportConnectionInfo connectionInfo = new ExportConnectionInfo(url, username, password); 135 connectionInfo.setDriver(driver); 136 connectionInfo.setNetworkTimeout(networkTimeout); 137 138 if (attributes.get("connectionProperties") instanceof Map) { 139 Map<String, Object> props = Util.otom(attributes.get("connectionProperties")); 140 for(String key : props.keySet()) { 141 Object val = props.get(key); 142 if (val != null) { 143 connectionInfo.getOptions().setProperty(key, Util.otoa(val)); 144 } 145 } 146 } 147 148 String configHash = String.valueOf(Objects.hash(doLinkCleanup, linkFilters, linkFilters2, identityFilters, configurationName, connectionInfo)); 149 150 Map<String, Long> cutoffDates = new HashMap<>(); 151 152 String taskName = taskResult.getDefinition().getName(); 153 154 try (Connection connection = ExportPartition.openConnection(context, connectionInfo)) { 155 try (PreparedStatement statement = connection.prepareStatement("select last_start_time, run_key, config_hash from de_runs where task_name = ? order by last_start_time desc")) { 156 statement.setString(1, taskName); 157 158 try (ResultSet results = statement.executeQuery()) { 159 while (results.next()) { 160 String key = results.getString("run_key"); 161 String configHashString = results.getString("config_hash"); 162 long lastStartTime = results.getLong("last_start_time"); 163 164 if (Util.nullSafeEq(configHashString, configHash)) { 165 cutoffDates.put(key, lastStartTime); 166 } else { 167 logger.warn("For export partition " + key + ": new config hash = " + configHash + ", old config hash = " + configHashString); 168 taskResult.addMessage(Message.warn("Configuration has changed after last run of partition " + key + "; forcing a full export")); 169 cutoffDates.put(key, 0L); 170 } 171 172 logger.info("For export partition " + key + ": threshold timestamp = " + Instant.ofEpochMilli(cutoffDates.get(key))); 173 } 174 } 175 } 176 } catch(SQLException e) { 177 throw new GeneralException(e); 178 } 179 180 taskResult.setAttribute("cutoffDates", cutoffDates); 181 182 int count = 1; 183 for(String filter : Util.safeIterable(identityFilters)) { 184 String lookup = "identity:" + filter; 185 186 Long cutoffDate = cutoffDates.get(lookup); 187 if (cutoffDate == null) { 188 logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0)); 189 cutoffDate = 0L; 190 } 191 192 ExportIdentitiesPartition eip = new ExportIdentitiesPartition(); 193 eip.setName("Identity export partition " + count++); 194 eip.setPhase(1); 195 eip.setExportTimestamp(now); 196 eip.setCutoffDate(cutoffDate); 197 eip.setFilterString(filter); 198 eip.setConnectionInfo(connectionInfo); 199 eip.setConfigurationName(configurationName); 200 eip.setTaskName(taskName); 201 eip.setRunKey(lookup); 202 eip.setConfigHash(configHash); 203 204 partitions.add(eip); 205 } 206 207 count = 1; 208 for(String filter : Util.safeIterable(linkFilters)) { 209 Filter compiled1 = Filter.compile(filter); 210 for(String filter2 : Util.safeIterable(linkFilters2)) { 211 String lookup; 212 213 if (Util.isNullOrEmpty(filter2)) { 214 lookup = "link:" + compiled1.getExpression(false); 215 } else { 216 lookup = "link:" + Filter.and(compiled1, Filter.compile(filter2)).getExpression(false); 217 } 218 Long cutoffDate = cutoffDates.get(lookup); 219 if (cutoffDate == null) { 220 logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0)); 221 cutoffDate = 0L; 222 } 223 224 ExportLinksPartition elp = new ExportLinksPartition(); 225 elp.setName("Link export partition " + count++); 226 elp.setPhase(2); 227 elp.setDependentPhase(1); 228 elp.setExportTimestamp(now); 229 elp.setCutoffDate(cutoffDate); 230 elp.setFilterString(filter); 231 elp.setFilterString2(filter2); 232 elp.setConnectionInfo(connectionInfo); 233 elp.setConfigurationName(configurationName); 234 elp.setTaskName(taskName); 235 elp.setRunKey(lookup); 236 elp.setConfigHash(configHash); 237 238 if (linkBatchSize > 0) { 239 elp.setBatchSize(linkBatchSize); 240 } 241 242 partitions.add(elp); 243 } 244 } 245 246 if (doLinkCleanup) { 247 CleanupLinksPartition clp = new CleanupLinksPartition(); 248 clp.setPhase(3); 249 clp.setDependentPhase(2); 250 clp.setName("Clean up deleted Links"); 251 clp.setConnectionInfo(connectionInfo); 252 clp.setRunKey("cleanup"); 253 clp.setTaskName(taskName); 254 clp.setConfigHash(configHash); 255 partitions.add(clp); 256 } 257 258 return partitions; 259 } 260 261 @Override 262 public boolean terminate() { 263 this.stopped.set(true); 264 return true; 265 } 266}