001package com.identityworksllc.iiq.common.task.export;
002
003import com.identityworksllc.iiq.common.Utilities;
004import com.identityworksllc.iiq.common.request.SailPointWorkerExecutor;
005import com.identityworksllc.iiq.common.threads.SailPointWorker;
006import org.apache.commons.logging.Log;
007import org.apache.commons.logging.LogFactory;
008import sailpoint.api.SailPointContext;
009import sailpoint.object.*;
010import sailpoint.task.AbstractTaskExecutor;
011import sailpoint.tools.GeneralException;
012import sailpoint.tools.JdbcUtil;
013import sailpoint.tools.Message;
014import sailpoint.tools.Util;
015
016import java.sql.Connection;
017import java.sql.PreparedStatement;
018import java.sql.ResultSet;
019import java.sql.SQLException;
020import java.time.Instant;
021import java.util.*;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024/**
025 * A partitioned task for handling data exports. The task can be provided multiple filters
026 * that should cover the entire set of desired export users.
027 *
028 * The partitions will run in three phases: the actual export, then a cleanup of any Links
029 * no longer in IIQ, then a finalization step that sets the last run date.
030 */
031public class IDWDataExporter extends AbstractTaskExecutor {
032    // TODO
033    public static final List<String> SUFFIXES_16 =
034            Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f");
035
036    // TODO
037    public static final List<String> SUFFIXES_256 =
038            Arrays.asList(
039                    "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f", "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f", "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af", "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf", "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf", "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df", "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef", "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff"
040            );
041
042    // TODO
043    public static final int VERSION = 1;
044
045    private final Log logger;
046    private final AtomicBoolean stopped;
047
048    public IDWDataExporter() {
049        this.stopped = new AtomicBoolean();
050        this.logger = LogFactory.getLog(IDWDataExporter.class);
051    }
052    /**
053     * @see sailpoint.object.TaskExecutor#execute(SailPointContext, TaskSchedule, TaskResult, Attributes)
054     */
055    @Override
056    public void execute(SailPointContext context, TaskSchedule taskSchedule, TaskResult taskResult, Attributes<String, Object> attributes) throws Exception {
057        String requestDefinitionName = attributes.getString("requestDefinitionName");
058        if (Util.isNullOrEmpty(requestDefinitionName)) {
059            requestDefinitionName = SailPointWorkerExecutor.REQUEST_DEFINITION;
060        }
061        RequestDefinition requestDefinition = context.getObjectByName(RequestDefinition.class, requestDefinitionName);
062        if (requestDefinition == null) {
063            throw new IllegalArgumentException("Request definition called " + requestDefinitionName + " does not exist; do you need to import it?");
064        }
065
066        taskResult.addMessage("Partitions will execute using the Request Definition called " + requestDefinitionName);
067
068        List<ExportPartition> clusters = getPartitions(context, taskResult, attributes);
069
070        List<Request> partitionRequests = new ArrayList<>();
071        for(ExportPartition partition : clusters) {
072            List<SailPointWorker> workerCluster = new ArrayList<>();
073            workerCluster.add(partition);
074            // Serializes the SailPointWorker object so that it can be persisted
075            Request partitionRequest = SailPointWorker.toRequest(requestDefinition, workerCluster);
076            partitionRequest.setName(partition.getName());
077            partitionRequests.add(partitionRequest);
078        }
079
080        taskResult.addMessage(Message.info("Launching " + partitionRequests.size() + " partitions"));
081
082        launchPartitions(context, taskResult, partitionRequests);
083    }
084
085    /**
086     * Gets the list of partitions for the export operation. These will each have their 'phase'
087     * attribute set so that they run in order.
088     *
089     * @param context The context
090     * @param taskResult The task result for the parent task
091     * @param attributes The attributes of the task execution
092     * @return The resulting list of partitions to launch
093     * @throws GeneralException if any failures occur
094     */
095    public List<ExportPartition> getPartitions(SailPointContext context, TaskResult taskResult, Attributes<String, Object> attributes) throws GeneralException {
096        long now = System.currentTimeMillis();
097
098        List<ExportPartition> partitions = new ArrayList<>();
099        String configurationName = attributes.getString("configurationName");
100        if (Util.isNullOrEmpty(configurationName)) {
101            throw new GeneralException("A configurationName setting is required for the data export job");
102        }
103
104        int linkBatchSize = Util.otoi(attributes.get("linkBatchSize"));
105
106        List<String> identityFilters = attributes.getStringList("identityFilters");
107        List<String> linkFilters = attributes.getStringList("linkFilters");
108        List<String> linkFilters2 = attributes.getStringList("linkFilters2");
109
110        // Everything in one giant partition by default
111        if (identityFilters == null || identityFilters.isEmpty()) {
112            identityFilters = new ArrayList<>();
113            identityFilters.add("id.notNull()");
114        }
115
116        // Everything in one giant partition by default
117        if (linkFilters == null || linkFilters.isEmpty()) {
118            linkFilters = new ArrayList<>();
119            linkFilters.add("id.notNull()");
120        }
121
122        if (linkFilters2 == null || linkFilters2.isEmpty()) {
123            linkFilters2 = new ArrayList<>();
124            linkFilters2.add(null);
125        }
126
127        boolean doLinkCleanup = attributes.getBoolean("linkCleanup", true);
128
129        List<String> gatherStatsTables = attributes.getStringList("gatherStatsTables");
130
131        long networkTimeout = Util.otolo(attributes.getString("networkTimeout"));
132
133        String driver = attributes.getString("driver");
134        String url = attributes.getString("url");
135        String username = attributes.getString("username");
136        String password = attributes.getString("password");
137
138        ExportConnectionInfo connectionInfo = new ExportConnectionInfo(url, username, password);
139        connectionInfo.setDriver(driver);
140        connectionInfo.setNetworkTimeout(networkTimeout);
141
142        if (attributes.get("connectionProperties") instanceof Map) {
143            Map<String, Object> props = Util.otom(attributes.get("connectionProperties"));
144            for(String key : props.keySet()) {
145                Object val = props.get(key);
146                if (val != null) {
147                    connectionInfo.getOptions().setProperty(key, Util.otoa(val));
148                }
149            }
150        }
151
152        String configHash = String.valueOf(Objects.hash(doLinkCleanup, linkFilters, linkFilters2, identityFilters, configurationName, connectionInfo));
153
154        Map<String, Long> cutoffDates = new HashMap<>();
155
156        String taskName = taskResult.getDefinition().getName();
157
158        boolean deleteEnabled = true;
159        boolean isOracle = false;
160
161        try (Connection connection = ExportPartition.openConnection(context, connectionInfo)) {
162            isOracle = JdbcUtil.isOracle(connection);
163
164            try (PreparedStatement existingRows = connection.prepareStatement("select count(*) as c from de_identity")) {
165                try (ResultSet resultSet = existingRows.executeQuery()) {
166                    if (resultSet.next()) {
167                        int count = resultSet.getInt(1);
168                        if (count == 0) {
169                            logger.warn("DE_IDENTITY is empty; for first run, deletes will be suppressed to avoid index hangs");
170                            deleteEnabled = false;
171                            doLinkCleanup = false;
172                        }
173                    }
174                }
175            }
176            try (PreparedStatement statement = connection.prepareStatement("select last_start_time, run_key, config_hash from de_runs where task_name = ? order by last_start_time desc")) {
177                statement.setString(1, taskName);
178
179                try (ResultSet results = statement.executeQuery()) {
180                    while (results.next()) {
181                        String key = results.getString("run_key");
182                        String configHashString = results.getString("config_hash");
183                        long lastStartTime = results.getLong("last_start_time");
184
185                        if (Util.nullSafeEq(configHashString, configHash)) {
186                            cutoffDates.put(key, lastStartTime);
187                        } else {
188                            logger.warn("For export partition " + key + ": new config hash = " + configHash + ", old config hash = " + configHashString);
189                            taskResult.addMessage(Message.warn("Configuration has changed after last run of partition " + key + "; forcing a full export"));
190                            cutoffDates.put(key, 0L);
191                        }
192
193                        logger.info("For export partition " + key + ": threshold timestamp = " + Instant.ofEpochMilli(cutoffDates.get(key)));
194                    }
195                }
196            }
197        } catch(SQLException e) {
198            throw new GeneralException(e);
199        }
200
201        taskResult.setAttribute("cutoffDates", cutoffDates);
202
203        int count = 1;
204        for(String filter : Util.safeIterable(identityFilters)) {
205            String lookup = "identity:" + filter;
206
207            Long cutoffDate = cutoffDates.get(lookup);
208            if (cutoffDate == null) {
209                logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0));
210                cutoffDate = 0L;
211            }
212
213            ExportIdentitiesPartition eip = new ExportIdentitiesPartition();
214            eip.setName("Identity export partition " + count++);
215            eip.setPhase(1);
216            eip.setExportTimestamp(now);
217            eip.setCutoffDate(cutoffDate);
218            eip.setFilterString(filter);
219            eip.setConnectionInfo(connectionInfo);
220            eip.setConfigurationName(configurationName);
221            eip.setTaskName(taskName);
222            eip.setRunKey(lookup);
223            eip.setConfigHash(configHash);
224            eip.setDeleteEnabled(deleteEnabled);
225
226            partitions.add(eip);
227        }
228
229        int totalLinkPartitions = Utilities.safeSize(linkFilters) * Utilities.safeSize(linkFilters2);
230        int halfway = totalLinkPartitions / 2;
231        int linkPartitionPhase = 2;
232
233        count = 1;
234        for(String filter : Util.safeIterable(linkFilters)) {
235            Filter compiled1 = Filter.compile(filter);
236            for(String filter2 : Util.safeIterable(linkFilters2)) {
237                String lookup;
238
239                if (Util.isNullOrEmpty(filter2)) {
240                    lookup = "link:" + compiled1.getExpression(false);
241                } else {
242                    lookup = "link:" + Filter.and(compiled1, Filter.compile(filter2)).getExpression(false);
243                }
244                Long cutoffDate = cutoffDates.get(lookup);
245                if (cutoffDate == null) {
246                    logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0));
247                    cutoffDate = 0L;
248                }
249
250                ExportLinksPartition elp = new ExportLinksPartition();
251                elp.setName("Link export partition " + count++);
252                elp.setPhase(linkPartitionPhase);
253                elp.setDependentPhase(linkPartitionPhase - 1);
254                elp.setExportTimestamp(now);
255                elp.setCutoffDate(cutoffDate);
256                elp.setFilterString(filter);
257                elp.setFilterString2(filter2);
258                elp.setConnectionInfo(connectionInfo);
259                elp.setConfigurationName(configurationName);
260                elp.setTaskName(taskName);
261                elp.setRunKey(lookup);
262                elp.setConfigHash(configHash);
263                elp.setDeleteEnabled(deleteEnabled);
264
265                if (linkBatchSize > 0) {
266                    elp.setBatchSize(linkBatchSize);
267                }
268
269                partitions.add(elp);
270
271                if (count == halfway && isOracle && !Util.isEmpty(gatherStatsTables)) {
272                    taskResult.addMessage("Adding a 'gather table stats' partition after link partition " + halfway);
273                    OracleGatherStatsPartition statsPartition = new OracleGatherStatsPartition(gatherStatsTables);
274                    // Phase 3
275                    statsPartition.setPhase(linkPartitionPhase + 1);
276                    statsPartition.setDependentPhase(linkPartitionPhase);
277                    statsPartition.setName("Gather table stats: PARTIAL");
278                    partitions.add(statsPartition);
279
280                    // Now phase 4, after the stats partition
281                    linkPartitionPhase += 2;
282                }
283            }
284        }
285
286        if (doLinkCleanup) {
287            CleanupLinksPartition clp = new CleanupLinksPartition();
288            // Either 3 or 5, depending on stats
289            clp.setPhase(linkPartitionPhase + 1);
290            clp.setDependentPhase(linkPartitionPhase);
291            clp.setName("Clean up deleted Links");
292            clp.setConnectionInfo(connectionInfo);
293            clp.setRunKey("cleanup");
294            clp.setTaskName(taskName);
295            clp.setConfigHash(configHash);
296            partitions.add(clp);
297        }
298
299        if (isOracle && !Util.isEmpty(gatherStatsTables)) {
300            OracleGatherStatsPartition statsPartition = new OracleGatherStatsPartition(gatherStatsTables);
301            statsPartition.setPhase(linkPartitionPhase + 1);
302            statsPartition.setDependentPhase(linkPartitionPhase);
303            statsPartition.setName("Gather table stats: FINAL");
304            partitions.add(statsPartition);
305        }
306
307        return partitions;
308    }
309
310    @Override
311    public boolean terminate() {
312        this.stopped.set(true);
313        return true;
314    }
315}