package org.apache.falcon.metadata;

import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
import java.net.URISyntaxException;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-common-0.8.jar:org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.class */
public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
    private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm";
    private static final String NONE = "NONE";
    private static final String IGNORE = "IGNORE";
    private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
    private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {WorkflowExecutionArgs.USER_WORKFLOW_NAME, WorkflowExecutionArgs.USER_WORKFLOW_ENGINE, WorkflowExecutionArgs.WORKFLOW_ID, WorkflowExecutionArgs.RUN_ID, WorkflowExecutionArgs.STATUS, WorkflowExecutionArgs.WF_ENGINE_URL, WorkflowExecutionArgs.USER_SUBFLOW_ID};

    public InstanceRelationshipGraphBuilder(Graph graph, boolean z) {
        super(graph, z);
    }

    public Vertex addProcessInstance(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        String processInstanceName = getProcessInstanceName(workflowExecutionContext);
        LOG.info("Adding process instance: {}", processInstanceName);
        Vertex addVertex = addVertex(processInstanceName, RelationshipType.PROCESS_INSTANCE, workflowExecutionContext.getTimeStampAsLong());
        addWorkflowInstanceProperties(addVertex, workflowExecutionContext);
        addInstanceToEntity(addVertex, workflowExecutionContext.getEntityName(), RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
        addInstanceToEntity(addVertex, workflowExecutionContext.getClusterName(), RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
        addInstanceToEntity(addVertex, workflowExecutionContext.getWorkflowUser(), RelationshipType.USER, RelationshipLabel.USER);
        if (isPreserveHistory()) {
            Process process = (Process) ConfigurationStore.get().get(EntityType.PROCESS, workflowExecutionContext.getEntityName());
            addDataClassification(process.getTags(), addVertex);
            addPipelines(process.getPipelines(), addVertex);
        }
        addCounters(addVertex, workflowExecutionContext);
        return addVertex;
    }

    private void addCounters(Vertex vertex, WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        String counterString = getCounterString(workflowExecutionContext);
        if (StringUtils.isBlank(counterString)) {
            return;
        }
        addCountersToInstance(counterString, vertex);
    }

    private String getCounterString(WorkflowExecutionContext workflowExecutionContext) {
        if (StringUtils.isBlank(workflowExecutionContext.getCounters())) {
            return null;
        }
        return workflowExecutionContext.getCounters();
    }

    public String getProcessInstanceName(WorkflowExecutionContext workflowExecutionContext) {
        return workflowExecutionContext.getEntityName() + "/" + workflowExecutionContext.getNominalTimeAsISO8601();
    }

    public void addWorkflowInstanceProperties(Vertex vertex, WorkflowExecutionContext workflowExecutionContext) {
        for (WorkflowExecutionArgs workflowExecutionArgs : INSTANCE_WORKFLOW_PROPERTIES) {
            addProperty(vertex, workflowExecutionContext, workflowExecutionArgs);
        }
        vertex.setProperty(RelationshipProperty.VERSION.getName(), workflowExecutionContext.getUserWorkflowVersion());
    }

    private void addProperty(Vertex vertex, WorkflowExecutionContext workflowExecutionContext, WorkflowExecutionArgs workflowExecutionArgs) {
        String value = workflowExecutionContext.getValue(workflowExecutionArgs);
        if (value == null || value.length() == 0) {
            return;
        }
        vertex.setProperty(workflowExecutionArgs.getName(), value);
    }

    private void addCountersToInstance(String str, Vertex vertex) throws FalconException {
        try {
            for (String str2 : str.split(",")) {
                String[] split = str2.split(":", 2);
                vertex.setProperty(split[0], Long.valueOf(Long.parseLong(split[1])));
            }
        } catch (NumberFormatException e) {
            throw new FalconException("Invalid values for counter:" + e);
        }
    }

    public void addInstanceToEntity(Vertex vertex, String str, RelationshipType relationshipType, RelationshipLabel relationshipLabel) {
        addInstanceToEntity(vertex, str, relationshipType, relationshipLabel, null);
    }

    public void addInstanceToEntity(Vertex vertex, String str, RelationshipType relationshipType, RelationshipLabel relationshipLabel, String str2) {
        Vertex findVertex = findVertex(str, relationshipType);
        LOG.info("Vertex exists? name={}, type={}, v={}", str, relationshipType, findVertex);
        if (findVertex == null) {
            LOG.error("Illegal State: {} vertex must exist for {}", relationshipType, str);
            throw new IllegalStateException(relationshipType + " entity vertex must exist " + str);
        }
        addEdge(vertex, findVertex, relationshipLabel.getName(), str2);
    }

    public void addOutputFeedInstances(WorkflowExecutionContext workflowExecutionContext, Vertex vertex) throws FalconException {
        String outputFeedNames = workflowExecutionContext.getOutputFeedNames();
        if ("NONE".equals(outputFeedNames) || "IGNORE".equals(outputFeedNames)) {
            return;
        }
        String[] outputFeedNamesList = workflowExecutionContext.getOutputFeedNamesList();
        String[] outputFeedInstancePathsList = workflowExecutionContext.getOutputFeedInstancePathsList();
        for (int i = 0; i < outputFeedNamesList.length; i++) {
            addFeedInstance(vertex, RelationshipLabel.PROCESS_FEED_EDGE, workflowExecutionContext, outputFeedNamesList[i], outputFeedInstancePathsList[i]);
        }
    }

    public void addInputFeedInstances(WorkflowExecutionContext workflowExecutionContext, Vertex vertex) throws FalconException {
        String inputFeedNames = workflowExecutionContext.getInputFeedNames();
        if ("NONE".equals(inputFeedNames) || "IGNORE".equals(inputFeedNames)) {
            return;
        }
        String[] inputFeedNamesList = workflowExecutionContext.getInputFeedNamesList();
        String[] inputFeedInstancePathsList = workflowExecutionContext.getInputFeedInstancePathsList();
        for (int i = 0; i < inputFeedNamesList.length; i++) {
            String str = inputFeedNamesList[i];
            for (String str2 : inputFeedInstancePathsList[i].split(",")) {
                addFeedInstance(vertex, RelationshipLabel.FEED_PROCESS_EDGE, workflowExecutionContext, str, str2);
            }
        }
    }

    public void addReplicatedInstance(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        String outputFeedNames = workflowExecutionContext.getOutputFeedNames();
        String outputFeedInstancePaths = workflowExecutionContext.getOutputFeedInstancePaths();
        String clusterName = workflowExecutionContext.getClusterName();
        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", outputFeedNames, outputFeedInstancePaths, clusterName);
        String feedInstanceName = getFeedInstanceName(outputFeedNames, clusterName, outputFeedInstancePaths, workflowExecutionContext.getNominalTimeAsISO8601());
        Vertex findVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
        LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, RelationshipType.FEED_INSTANCE, findVertex);
        if (findVertex == null) {
            LOG.info("{} instance vertex {} does not exist, add it", RelationshipType.FEED_INSTANCE, feedInstanceName);
            findVertex = addFeedInstance(feedInstanceName, workflowExecutionContext, outputFeedNames, workflowExecutionContext.getSrcClusterName());
        }
        addInstanceToEntity(findVertex, clusterName, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, workflowExecutionContext.getTimeStampAsISO8601());
        addCounters(findVertex, workflowExecutionContext);
    }

    public void addEvictedInstance(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        String outputFeedInstancePaths = workflowExecutionContext.getOutputFeedInstancePaths();
        if ("IGNORE".equals(outputFeedInstancePaths)) {
            LOG.info("There were no evicted instances, nothing to record");
            return;
        }
        LOG.info("Recording lineage for evicted instances {}", outputFeedInstancePaths);
        String outputFeedNames = workflowExecutionContext.getOutputFeedNames();
        String[] outputFeedInstancePathsList = workflowExecutionContext.getOutputFeedInstancePathsList();
        String clusterName = workflowExecutionContext.getClusterName();
        for (String str : outputFeedInstancePathsList) {
            LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}", outputFeedNames, str, clusterName);
            String feedInstanceName = getFeedInstanceName(outputFeedNames, clusterName, str, workflowExecutionContext.getNominalTimeAsISO8601());
            Vertex findVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
            LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, RelationshipType.FEED_INSTANCE, findVertex);
            if (findVertex == null) {
                LOG.info("{} instance vertex {} does not exist, add it", RelationshipType.FEED_INSTANCE, feedInstanceName);
                findVertex = addFeedInstance(feedInstanceName, workflowExecutionContext, outputFeedNames, clusterName);
            }
            addInstanceToEntity(findVertex, clusterName, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, workflowExecutionContext.getTimeStampAsISO8601());
        }
    }

    private void addFeedInstance(Vertex vertex, RelationshipLabel relationshipLabel, WorkflowExecutionContext workflowExecutionContext, String str, String str2) throws FalconException {
        String clusterName = workflowExecutionContext.getClusterName();
        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", str, str2, clusterName);
        addProcessFeedEdge(vertex, addFeedInstance(getFeedInstanceName(str, clusterName, str2, workflowExecutionContext.getNominalTimeAsISO8601()), workflowExecutionContext, str, clusterName), relationshipLabel);
    }

    private Vertex addFeedInstance(String str, WorkflowExecutionContext workflowExecutionContext, String str2, String str3) throws FalconException {
        LOG.info("Adding feed instance {}", str);
        Vertex addVertex = addVertex(str, RelationshipType.FEED_INSTANCE, workflowExecutionContext.getTimeStampAsLong());
        addInstanceToEntity(addVertex, str2, RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
        addInstanceToEntity(addVertex, str3, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
        addInstanceToEntity(addVertex, workflowExecutionContext.getWorkflowUser(), RelationshipType.USER, RelationshipLabel.USER);
        if (isPreserveHistory()) {
            Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, str2);
            addDataClassification(feed.getTags(), addVertex);
            addGroups(feed.getGroups(), addVertex);
        }
        return addVertex;
    }

    public static String getFeedInstanceName(String str, String str2, String str3, String str4) throws FalconException {
        try {
            Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, str);
            Cluster cluster = (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str2);
            Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
            return storageType == Storage.TYPE.TABLE ? getTableFeedInstanceName(feed, str3, storageType) : getFileSystemFeedInstanceName(str3, feed, cluster, str4);
        } catch (URISyntaxException e) {
            throw new FalconException(e);
        }
    }

    private static String getTableFeedInstanceName(Feed feed, String str, Storage.TYPE type) throws URISyntaxException {
        return feed.getName() + "/" + ((CatalogStorage) FeedHelper.createStorage(type.name(), str)).toPartitionAsPath();
    }

    private static String getFileSystemFeedInstanceName(String str, Feed feed, Cluster cluster, String str2) throws FalconException {
        String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
        String str3 = str;
        for (String str4 : FeedDataPath.PATTERN.split(uriTemplate)) {
            str3 = str3.replaceFirst(str4, "");
        }
        return StringUtils.isEmpty(str3) ? feed.getName() + "/" + str2 : feed.getName() + "/" + SchemaHelper.formatDateUTC(FeedHelper.getDate(uriTemplate, new Path(str), TimeZone.getTimeZone("UTC")));
    }
}
