001package com.identityworksllc.iiq.common.task.export;
002
003import com.identityworksllc.iiq.common.request.SailPointWorkerExecutor;
004import com.identityworksllc.iiq.common.threads.SailPointWorker;
005import org.apache.commons.logging.Log;
006import org.apache.commons.logging.LogFactory;
007import sailpoint.api.SailPointContext;
008import sailpoint.object.*;
009import sailpoint.task.AbstractTaskExecutor;
010import sailpoint.tools.GeneralException;
011import sailpoint.tools.Message;
012import sailpoint.tools.Util;
013
014import java.sql.Connection;
015import java.sql.PreparedStatement;
016import java.sql.ResultSet;
017import java.sql.SQLException;
018import java.time.Instant;
019import java.util.*;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022/**
023 * A partitioned task for handling data exports. The task can be provided multiple filters
024 * that should cover the entire set of desired export users.
025 *
026 * The partitions will run in three phases: the actual export, then a cleanup of any Links
027 * no longer in IIQ, then a finalization step that sets the last run date.
028 */
029public class IDWDataExporter extends AbstractTaskExecutor {
030    // TODO
031    public static final List<String> suffixes16 =
032            Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f");
033
034    // TODO
035    public static final List<String> suffixes256 =
036            Arrays.asList(
037                    "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f", "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f", "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af", "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf", "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf", "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df", "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef", "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff"
038            );
039
040    // TODO
041    public static final int VERSION = 1;
042
043    private final Log logger;
044    private final AtomicBoolean stopped;
045
046    public IDWDataExporter() {
047        this.stopped = new AtomicBoolean();
048        this.logger = LogFactory.getLog(IDWDataExporter.class);
049    }
050    /**
051     * @see sailpoint.object.TaskExecutor#execute(SailPointContext, TaskSchedule, TaskResult, Attributes)
052     */
053    @Override
054    public void execute(SailPointContext context, TaskSchedule taskSchedule, TaskResult taskResult, Attributes<String, Object> attributes) throws Exception {
055        String requestDefinitionName = attributes.getString("requestDefinitionName");
056        if (Util.isNullOrEmpty(requestDefinitionName)) {
057            requestDefinitionName = SailPointWorkerExecutor.REQUEST_DEFINITION;
058        }
059        RequestDefinition requestDefinition = context.getObjectByName(RequestDefinition.class, requestDefinitionName);
060        if (requestDefinition == null) {
061            throw new IllegalArgumentException("Request definition called " + requestDefinitionName + " does not exist; do you need to import it?");
062        }
063
064        taskResult.addMessage("Partitions will execute using the Request Definition called " + requestDefinitionName);
065
066        List<ExportPartition> clusters = getPartitions(context, taskResult, attributes);
067
068        List<Request> partitionRequests = new ArrayList<>();
069        for(ExportPartition partition : clusters) {
070            List<SailPointWorker> workerCluster = new ArrayList<>();
071            workerCluster.add(partition);
072            // Serializes the SailPointWorker object so that it can be persisted
073            Request partitionRequest = SailPointWorker.toRequest(requestDefinition, workerCluster);
074            partitionRequest.setName(partition.getName());
075            partitionRequests.add(partitionRequest);
076        }
077
078        taskResult.addMessage(Message.info("Launching " + partitionRequests.size() + " partitions"));
079
080        launchPartitions(context, taskResult, partitionRequests);
081    }
082
083    /**
084     * Gets the list of partitions for the export operation. These will each have their 'phase'
085     * attribute set so that they run in order.
086     *
087     * @param context The context
088     * @param taskResult The task result for the parent task
089     * @param attributes The attributes of the task execution
090     * @return The resulting list of partitions to launch
091     * @throws GeneralException if any failures occur
092     */
093    public List<ExportPartition> getPartitions(SailPointContext context, TaskResult taskResult, Attributes<String, Object> attributes) throws GeneralException {
094        long now = System.currentTimeMillis();
095
096        List<ExportPartition> partitions = new ArrayList<>();
097        String configurationName = attributes.getString("configurationName");
098        if (Util.isNullOrEmpty(configurationName)) {
099            throw new GeneralException("A configurationName setting is required for the data export job");
100        }
101
102        int linkBatchSize = Util.otoi(attributes.get("linkBatchSize"));
103
104        List<String> identityFilters = attributes.getStringList("identityFilters");
105        List<String> linkFilters = attributes.getStringList("linkFilters");
106        List<String> linkFilters2 = attributes.getStringList("linkFilters2");
107
108        // Everything in one giant partition by default
109        if (identityFilters == null || identityFilters.isEmpty()) {
110            identityFilters = new ArrayList<>();
111            identityFilters.add("id.notNull()");
112        }
113
114        // Everything in one giant partition by default
115        if (linkFilters == null || linkFilters.isEmpty()) {
116            linkFilters = new ArrayList<>();
117            linkFilters.add("id.notNull()");
118        }
119
120        if (linkFilters2 == null || linkFilters2.isEmpty()) {
121            linkFilters2 = new ArrayList<>();
122            linkFilters2.add(null);
123        }
124
125        boolean doLinkCleanup = attributes.getBoolean("linkCleanup", true);
126
127        long networkTimeout = attributes.getLong("networkTimeout");
128
129        String driver = attributes.getString("driver");
130        String url = attributes.getString("url");
131        String username = attributes.getString("username");
132        String password = attributes.getString("password");
133
134        ExportConnectionInfo connectionInfo = new ExportConnectionInfo(url, username, password);
135        connectionInfo.setDriver(driver);
136        connectionInfo.setNetworkTimeout(networkTimeout);
137
138        if (attributes.get("connectionProperties") instanceof Map) {
139            Map<String, Object> props = Util.otom(attributes.get("connectionProperties"));
140            for(String key : props.keySet()) {
141                Object val = props.get(key);
142                if (val != null) {
143                    connectionInfo.getOptions().setProperty(key, Util.otoa(val));
144                }
145            }
146        }
147
148        String configHash = String.valueOf(Objects.hash(doLinkCleanup, linkFilters, linkFilters2, identityFilters, configurationName, connectionInfo));
149
150        Map<String, Long> cutoffDates = new HashMap<>();
151
152        String taskName = taskResult.getDefinition().getName();
153
154        try (Connection connection = ExportPartition.openConnection(context, connectionInfo)) {
155            try (PreparedStatement statement = connection.prepareStatement("select last_start_time, run_key, config_hash from de_runs where task_name = ? order by last_start_time desc")) {
156                statement.setString(1, taskName);
157
158                try (ResultSet results = statement.executeQuery()) {
159                    while (results.next()) {
160                        String key = results.getString("run_key");
161                        String configHashString = results.getString("config_hash");
162                        long lastStartTime = results.getLong("last_start_time");
163
164                        if (Util.nullSafeEq(configHashString, configHash)) {
165                            cutoffDates.put(key, lastStartTime);
166                        } else {
167                            logger.warn("For export partition " + key + ": new config hash = " + configHash + ", old config hash = " + configHashString);
168                            taskResult.addMessage(Message.warn("Configuration has changed after last run of partition " + key + "; forcing a full export"));
169                            cutoffDates.put(key, 0L);
170                        }
171
172                        logger.info("For export partition " + key + ": threshold timestamp = " + Instant.ofEpochMilli(cutoffDates.get(key)));
173                    }
174                }
175            }
176        } catch(SQLException e) {
177            throw new GeneralException(e);
178        }
179
180        taskResult.setAttribute("cutoffDates", cutoffDates);
181
182        int count = 1;
183        for(String filter : Util.safeIterable(identityFilters)) {
184            String lookup = "identity:" + filter;
185
186            Long cutoffDate = cutoffDates.get(lookup);
187            if (cutoffDate == null) {
188                logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0));
189                cutoffDate = 0L;
190            }
191
192            ExportIdentitiesPartition eip = new ExportIdentitiesPartition();
193            eip.setName("Identity export partition " + count++);
194            eip.setPhase(1);
195            eip.setExportTimestamp(now);
196            eip.setCutoffDate(cutoffDate);
197            eip.setFilterString(filter);
198            eip.setConnectionInfo(connectionInfo);
199            eip.setConfigurationName(configurationName);
200            eip.setTaskName(taskName);
201            eip.setRunKey(lookup);
202            eip.setConfigHash(configHash);
203
204            partitions.add(eip);
205        }
206
207        count = 1;
208        for(String filter : Util.safeIterable(linkFilters)) {
209            Filter compiled1 = Filter.compile(filter);
210            for(String filter2 : Util.safeIterable(linkFilters2)) {
211                String lookup;
212
213                if (Util.isNullOrEmpty(filter2)) {
214                    lookup = "link:" + compiled1.getExpression(false);
215                } else {
216                    lookup = "link:" + Filter.and(compiled1, Filter.compile(filter2)).getExpression(false);
217                }
218                Long cutoffDate = cutoffDates.get(lookup);
219                if (cutoffDate == null) {
220                    logger.warn("No existing threshold date for " + lookup + ", using " + Instant.ofEpochMilli(0));
221                    cutoffDate = 0L;
222                }
223
224                ExportLinksPartition elp = new ExportLinksPartition();
225                elp.setName("Link export partition " + count++);
226                elp.setPhase(2);
227                elp.setDependentPhase(1);
228                elp.setExportTimestamp(now);
229                elp.setCutoffDate(cutoffDate);
230                elp.setFilterString(filter);
231                elp.setFilterString2(filter2);
232                elp.setConnectionInfo(connectionInfo);
233                elp.setConfigurationName(configurationName);
234                elp.setTaskName(taskName);
235                elp.setRunKey(lookup);
236                elp.setConfigHash(configHash);
237
238                if (linkBatchSize > 0) {
239                    elp.setBatchSize(linkBatchSize);
240                }
241
242                partitions.add(elp);
243            }
244        }
245
246        if (doLinkCleanup) {
247            CleanupLinksPartition clp = new CleanupLinksPartition();
248            clp.setPhase(3);
249            clp.setDependentPhase(2);
250            clp.setName("Clean up deleted Links");
251            clp.setConnectionInfo(connectionInfo);
252            clp.setRunKey("cleanup");
253            clp.setTaskName(taskName);
254            clp.setConfigHash(configHash);
255            partitions.add(clp);
256        }
257
258        return partitions;
259    }
260
261    @Override
262    public boolean terminate() {
263        this.stopped.set(true);
264        return true;
265    }
266}