package org.apache.falcon.oozie.feed;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.ExecutionType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.class */
public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
    private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
    private static final String IMPORT_HQL = "/action/feed/falcon-table-import.hql";
    private static final String EXPORT_HQL = "/action/feed/falcon-table-export.hql";
    private static final int THIRTY_MINUTES = 1800000;
    private static final String PARALLEL = "parallel";
    private static final String TIMEOUT = "timeout";
    private static final String ORDER = "order";

    public FeedReplicationCoordinatorBuilder(Feed feed) {
        super(feed, LifeCycle.REPLICATION);
    }

    @Override // org.apache.falcon.oozie.OozieCoordinatorBuilder
    public List<Properties> buildCoords(Cluster cluster, Path path) throws FalconException {
        if (FeedHelper.getCluster(this.entity, cluster.getName()).getType() != ClusterType.TARGET) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (org.apache.falcon.entity.v0.feed.Cluster cluster2 : this.entity.getClusters().getClusters()) {
            if (cluster2.getType() == ClusterType.SOURCE) {
                Cluster cluster3 = (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster2.getName());
                Properties doBuild = doBuild(cluster3, cluster, new Path(path, Tag.REPLICATION.name() + "/" + cluster3.getName()));
                if (doBuild != null && !doBuild.isEmpty()) {
                    arrayList.add(doBuild);
                }
            }
        }
        return arrayList;
    }

    private Properties doBuild(Cluster cluster, Cluster cluster2, Path path) throws FalconException {
        long replicationDelayInMillis = getReplicationDelayInMillis(cluster);
        Date startDate = getStartDate(cluster, replicationDelayInMillis);
        Date endDate = getEndDate(cluster);
        Date startDate2 = getStartDate(cluster2, replicationDelayInMillis);
        Date endDate2 = getEndDate(cluster2);
        if (noOverlapExists(startDate, endDate, startDate2, endDate2)) {
            throw new FalconException("Source cluster: " + cluster.getName() + " and Target cluster: " + cluster2.getName() + " do not have overlapping dates for replication");
        }
        Properties build = OozieOrchestrationWorkflowBuilder.get(this.entity, cluster2, Tag.REPLICATION).build(cluster2, path);
        COORDINATORAPP unmarshal = unmarshal(REPLICATION_COORD_TEMPLATE);
        String workflowName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(cluster.getName()), this.entity).toString();
        initializeCoordAttributes(unmarshal, workflowName, startDate.after(startDate2) ? SchemaHelper.formatDateUTC(startDate) : SchemaHelper.formatDateUTC(startDate2), endDate.before(endDate2) ? SchemaHelper.formatDateUTC(endDate) : SchemaHelper.formatDateUTC(endDate2), replicationDelayInMillis);
        setCoordControls(unmarshal);
        Storage createReadOnlyStorage = FeedHelper.createReadOnlyStorage(cluster, this.entity);
        initializeInputDataSet(cluster, unmarshal, createReadOnlyStorage);
        Storage createStorage = FeedHelper.createStorage(cluster2, this.entity);
        initializeOutputDataSet(cluster2, unmarshal, createStorage);
        unmarshal.setAction(getReplicationWorkflowAction(cluster, cluster2, path, workflowName, createReadOnlyStorage, createStorage));
        build.putAll(getProperties(marshal(cluster2, unmarshal, path), workflowName));
        return build;
    }

    private ACTION getReplicationWorkflowAction(Cluster cluster, Cluster cluster2, Path path, String str, Storage storage, Storage storage2) throws FalconException {
        ACTION action = new ACTION();
        WORKFLOW workflow = new WORKFLOW();
        workflow.setAppPath(getStoragePath(path));
        Properties createCoordDefaultConfiguration = createCoordDefaultConfiguration(str);
        createCoordDefaultConfiguration.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster2.getName() + "," + cluster.getName());
        createCoordDefaultConfiguration.put("srcClusterName", cluster.getName());
        createCoordDefaultConfiguration.put("srcClusterColo", cluster.getColo());
        createCoordDefaultConfiguration.put("falconFeedStorageType", storage.getType().name());
        String str2 = "";
        if (storage.getType() == Storage.TYPE.FILESYSTEM) {
            String pathsWithPartitions = getPathsWithPartitions(cluster, cluster2);
            str2 = pathsWithPartitions;
            propagateFileSystemCopyProperties(pathsWithPartitions, createCoordDefaultConfiguration);
        } else if (storage.getType() == Storage.TYPE.TABLE) {
            str2 = "${coord:dataIn('input')}";
            CatalogStorage catalogStorage = (CatalogStorage) storage;
            propagateTableStorageProperties(cluster, catalogStorage, createCoordDefaultConfiguration, "falconSource");
            CatalogStorage catalogStorage2 = (CatalogStorage) storage2;
            propagateTableStorageProperties(cluster2, catalogStorage2, createCoordDefaultConfiguration, "falconTarget");
            propagateTableCopyProperties(cluster, catalogStorage, cluster2, catalogStorage2, createCoordDefaultConfiguration);
            setupHiveConfiguration(cluster, cluster2, path);
        }
        propagateLateDataProperties(str2, storage.getType().name(), createCoordDefaultConfiguration);
        createCoordDefaultConfiguration.putAll(EntityUtil.getEntityProperties(this.entity));
        workflow.setConfiguration(getConfig(createCoordDefaultConfiguration));
        action.setWorkflow(workflow);
        return action;
    }

    private String getPathsWithPartitions(Cluster cluster, Cluster cluster2) throws FalconException {
        String evaluateClusterExp = FeedHelper.evaluateClusterExp(cluster, FeedHelper.normalizePartitionExpression(FeedHelper.getCluster(this.entity, cluster.getName()).getPartition()));
        String evaluateClusterExp2 = FeedHelper.evaluateClusterExp(cluster2, FeedHelper.normalizePartitionExpression(FeedHelper.getCluster(this.entity, cluster2.getName()).getPartition()));
        StringBuilder sb = new StringBuilder();
        sb.append("${coord:dataIn('input')}/").append(FeedHelper.normalizePartitionExpression(evaluateClusterExp, evaluateClusterExp2));
        return StringUtils.stripEnd(sb.toString().replaceAll("//+", "/"), "/");
    }

    private void propagateFileSystemCopyProperties(String str, Properties properties) throws FalconException {
        properties.put("sourceRelativePaths", str);
        properties.put("distcpSourcePaths", "${coord:dataIn('input')}");
        properties.put("distcpTargetPaths", "${coord:dataOut('output')}");
    }

    private void propagateTableStorageProperties(Cluster cluster, CatalogStorage catalogStorage, Properties properties, String str) {
        properties.put(str + "NameNode", ClusterHelper.getStorageUrl(cluster));
        properties.put(str + "JobTracker", ClusterHelper.getMREndPoint(cluster));
        properties.put(str + "HcatNode", catalogStorage.getCatalogUrl());
        properties.put(str + "Database", catalogStorage.getDatabase());
        properties.put(str + "Table", catalogStorage.getTable());
        properties.put(str + "Partition", "${coord:dataInPartitions('input', 'hive-export')}");
    }

    private void propagateTableCopyProperties(Cluster cluster, CatalogStorage catalogStorage, Cluster cluster2, CatalogStorage catalogStorage2, Properties properties) {
        properties.put("distcpSourcePaths", FeedHelper.getStagingPath(true, cluster, this.entity, catalogStorage, Tag.REPLICATION, "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/" + cluster2.getName()));
        properties.put("falconSourceStagingDir", FeedHelper.getStagingPath(false, cluster, this.entity, catalogStorage, Tag.REPLICATION, "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/" + cluster2.getName()));
        properties.put("distcpTargetPaths", FeedHelper.getStagingPath(false, cluster2, this.entity, catalogStorage2, Tag.REPLICATION, "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/" + cluster2.getName()));
        properties.put("sourceRelativePaths", "IGNORE");
    }

    private void propagateLateDataProperties(String str, String str2, Properties properties) {
        properties.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), this.entity.getName());
        properties.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), str);
        properties.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), this.entity.getName());
        properties.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), str2);
        properties.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), this.entity.getName());
        properties.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
    }

    private void setupHiveConfiguration(Cluster cluster, Cluster cluster2, Path path) throws FalconException {
        FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster2));
        try {
            Path path2 = new Path(path, "scripts");
            copyHiveScript(createProxiedFileSystem, path2, IMPORT_HQL);
            copyHiveScript(createProxiedFileSystem, path2, EXPORT_HQL);
            Path path3 = new Path(path + "/conf");
            persistHiveConfiguration(createProxiedFileSystem, path3, cluster, "falcon-source-");
            persistHiveConfiguration(createProxiedFileSystem, path3, cluster2, "falcon-target-");
        } catch (IOException e) {
            throw new FalconException("Unable to create hive conf files", e);
        }
    }

    private void copyHiveScript(FileSystem fileSystem, Path path, String str) throws IOException {
        OutputStream outputStream = null;
        InputStream inputStream = null;
        try {
            outputStream = fileSystem.create(new Path(path, new Path(str).getName()));
            inputStream = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(str);
            IOUtils.copy(inputStream, outputStream);
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
            throw th;
        }
    }

    protected void persistHiveConfiguration(FileSystem fileSystem, Path path, Cluster cluster, String str) throws IOException {
        Configuration hiveCredentialsAsConf = getHiveCredentialsAsConf(cluster);
        OutputStream outputStream = null;
        try {
            outputStream = fileSystem.create(new Path(path, str + "hive-site.xml"));
            hiveCredentialsAsConf.writeXml(outputStream);
            IOUtils.closeQuietly(outputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(outputStream);
            throw th;
        }
    }

    private void initializeCoordAttributes(COORDINATORAPP coordinatorapp, String str, String str2, String str3, long j) {
        coordinatorapp.setName(str);
        coordinatorapp.setFrequency("${coord:" + this.entity.getFrequency().toString() + "}");
        if (j > 0) {
            String str4 = "${now(0," + (((-1) * j) / 60000) + ")}";
            coordinatorapp.getInputEvents().getDataIn().get(0).getInstance().set(0, str4);
            coordinatorapp.getOutputEvents().getDataOut().get(0).setInstance(str4);
        }
        coordinatorapp.setStart(str2);
        coordinatorapp.setEnd(str3);
        coordinatorapp.setTimezone(this.entity.getTimezone().getID());
    }

    private void setCoordControls(COORDINATORAPP coordinatorapp) throws FalconException {
        long longValue = ((Long) ExpressionHelper.get().evaluate(this.entity.getFrequency().toString(), Long.class)).longValue();
        long j = longValue * 6;
        if (j < 1800000) {
            j = 1800000;
        }
        Properties entityProperties = EntityUtil.getEntityProperties(this.entity);
        String property = entityProperties.getProperty(TIMEOUT);
        if (property != null) {
            try {
                j = ((Long) ExpressionHelper.get().evaluate(property, Long.class)).longValue();
            } catch (Exception e) {
                LOG.error("Unable to evaluate timeout:", e);
            }
        }
        coordinatorapp.getControls().setTimeout(String.valueOf(j / 60000));
        coordinatorapp.getControls().setThrottle(String.valueOf((j / longValue) * 2));
        String property2 = entityProperties.getProperty(PARALLEL);
        int i = 1;
        if (property2 != null) {
            try {
                i = Integer.parseInt(property2);
            } catch (NumberFormatException e2) {
                LOG.error("Unable to parse parallel:", e2);
            }
        }
        coordinatorapp.getControls().setConcurrency(String.valueOf(i));
        String property3 = entityProperties.getProperty(ORDER);
        ExecutionType executionType = ExecutionType.FIFO;
        if (property3 != null) {
            try {
                executionType = ExecutionType.fromValue(property3);
            } catch (IllegalArgumentException e3) {
                LOG.error("Unable to parse order:", e3);
            }
        }
        coordinatorapp.getControls().setExecution(executionType.name());
    }

    private void initializeInputDataSet(Cluster cluster, COORDINATORAPP coordinatorapp, Storage storage) throws FalconException {
        SYNCDATASET syncdataset = (SYNCDATASET) coordinatorapp.getDatasets().getDatasetOrAsyncDataset().get(0);
        String uriTemplate = storage.getUriTemplate(LocationType.DATA);
        if (storage.getType() == Storage.TYPE.TABLE) {
            uriTemplate = uriTemplate.replace("thrift", "hcat");
        }
        syncdataset.setUriTemplate(uriTemplate);
        setDatasetValues(syncdataset, cluster);
        if (this.entity.getAvailabilityFlag() == null) {
            syncdataset.setDoneFlag("");
        } else {
            syncdataset.setDoneFlag(this.entity.getAvailabilityFlag());
        }
    }

    private void initializeOutputDataSet(Cluster cluster, COORDINATORAPP coordinatorapp, Storage storage) throws FalconException {
        SYNCDATASET syncdataset = (SYNCDATASET) coordinatorapp.getDatasets().getDatasetOrAsyncDataset().get(1);
        String uriTemplate = storage.getUriTemplate(LocationType.DATA);
        if (storage.getType() == Storage.TYPE.TABLE) {
            uriTemplate = uriTemplate.replace("thrift", "hcat");
        }
        syncdataset.setUriTemplate(uriTemplate);
        setDatasetValues(syncdataset, cluster);
    }

    private void setDatasetValues(SYNCDATASET syncdataset, Cluster cluster) {
        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(FeedHelper.getCluster(this.entity, cluster.getName()).getValidity().getStart()));
        syncdataset.setTimezone(this.entity.getTimezone().getID());
        syncdataset.setFrequency("${coord:" + this.entity.getFrequency().toString() + "}");
    }

    private long getReplicationDelayInMillis(Cluster cluster) throws FalconException {
        Frequency delay = FeedHelper.getCluster(this.entity, cluster.getName()).getDelay();
        long j = 0;
        if (delay != null) {
            j = ((Long) ExpressionHelper.get().evaluate(delay.toString(), Long.class)).longValue();
        }
        return j;
    }

    private Date getStartDate(Cluster cluster, long j) {
        Date start = FeedHelper.getCluster(this.entity, cluster.getName()).getValidity().getStart();
        return j == 0 ? start : new Date(start.getTime() + j);
    }

    private Date getEndDate(Cluster cluster) {
        return FeedHelper.getCluster(this.entity, cluster.getName()).getValidity().getEnd();
    }

    private boolean noOverlapExists(Date date, Date date2, Date date3, Date date4) {
        return date.after(date4) || date3.after(date2);
    }
}
