package org.apache.gobblin.service.modules.flowgraph.pathfinder;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptorUtils;
import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
import org.apache.gobblin.service.modules.flow.FlowGraphPath;
import org.apache.gobblin.service.modules.flow.FlowUtils;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.class */
public abstract class AbstractPathFinder implements PathFinder {
    private static final Logger log = LoggerFactory.getLogger(AbstractPathFinder.class);
    private static final String SOURCE_PREFIX = "source";
    private static final String DESTINATION_PREFIX = "destination";
    private List<DataNode> destNodes;
    FlowGraph flowGraph;
    DataNode srcNode;
    DatasetDescriptor srcDatasetDescriptor;
    DatasetDescriptor destDatasetDescriptor;
    Map<FlowEdgeContext, FlowEdgeContext> pathMap;
    protected Long flowExecutionId;
    protected FlowSpec flowSpec;
    protected Config flowConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException {
        this.flowGraph = flowGraph;
        this.flowSpec = flowSpec;
        this.flowExecutionId = Long.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
        this.flowConfig = flowSpec.getConfig().withValue("flow.executionId", ConfigValueFactory.fromAnyRef(this.flowExecutionId));
        String string = ConfigUtils.getString(this.flowConfig, "gobblin.flow.sourceIdentifier", "");
        List<String> stringList = ConfigUtils.getStringList(this.flowConfig, "gobblin.flow.destinationIdentifier");
        this.srcNode = this.flowGraph.getNode(string);
        Preconditions.checkArgument(this.srcNode != null, "Flowgraph does not have a node with id " + string);
        for (String str : stringList) {
            DataNode node = this.flowGraph.getNode(str);
            Preconditions.checkArgument(node != null, "Flowgraph does not have a node with id " + str);
            if (this.destNodes == null) {
                this.destNodes = new ArrayList();
            }
            this.destNodes.add(node);
        }
        if (this.destNodes != null && ((Set) this.destNodes.stream().map((v0) -> {
            return v0.getClass();
        }).collect(Collectors.toSet())).size() > 1) {
            throw new RuntimeException("All destination nodes must use the same DataNode class");
        }
        boolean z = ConfigUtils.getBoolean(this.flowConfig, "flow.applyRetention", true);
        boolean z2 = ConfigUtils.getBoolean(this.flowConfig, "flow.applyInputRetention", false);
        if (z2 && !z) {
            throw new RuntimeException("Invalid retention configuration - shouldApplyRetentionOnInput = " + z2 + ", and shouldApplyRetention = " + z);
        }
        Config config = this.flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
        Config config2 = this.flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
        Config withValue = z2 ? config.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(false)) : config.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(Boolean.valueOf(z)));
        Config withValue2 = config2.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(Boolean.valueOf(z)));
        this.flowConfig = this.flowConfig.withValue("flow.applyRetention", ConfigValueFactory.fromAnyRef(Boolean.valueOf(z)));
        this.flowConfig = this.flowConfig.withValue("flow.applyInputRetention", ConfigValueFactory.fromAnyRef(Boolean.valueOf(z2)));
        Config withFallback = withValue.withFallback(getDefaultConfig(this.srcNode));
        withValue2 = this.destNodes != null ? withValue2.withFallback(getDefaultConfig(this.destNodes.get(0))) : withValue2;
        this.srcDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(withFallback);
        this.destDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(withValue2);
    }

    private Config getDefaultConfig(DataNode dataNode) {
        Config empty = ConfigFactory.empty();
        if (dataNode.getDefaultDatasetDescriptorClass() != null) {
            empty = empty.withValue("class", ConfigValueFactory.fromAnyRef(dataNode.getDefaultDatasetDescriptorClass()));
        }
        if (dataNode.getDefaultDatasetDescriptorPlatform() != null) {
            empty = empty.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef(dataNode.getDefaultDatasetDescriptorPlatform()));
        }
        return empty;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPathFound(DataNode dataNode, DataNode dataNode2, DatasetDescriptor datasetDescriptor, DatasetDescriptor datasetDescriptor2) {
        return dataNode.equals(dataNode2) && datasetDescriptor.equals(datasetDescriptor2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor datasetDescriptor, DatasetDescriptor datasetDescriptor2) {
        LinkedList linkedList = new LinkedList();
        List stringList = ConfigUtils.getStringList(this.flowConfig, "gobblin.flow.whitelistedEdgeIds");
        for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
            if (stringList.isEmpty() || stringList.contains(flowEdge.getId())) {
                try {
                    if (this.flowGraph.getNode(flowEdge.getDest()).isActive() && flowEdge.isActive()) {
                        boolean z = false;
                        for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
                            Config mergedConfig = getMergedConfig(flowEdge);
                            for (Pair<DatasetDescriptor, DatasetDescriptor> pair : flowEdge.getFlowTemplate().getDatasetDescriptors(mergedConfig, false)) {
                                DatasetDescriptor datasetDescriptor3 = (DatasetDescriptor) pair.getLeft();
                                DatasetDescriptor datasetDescriptor4 = (DatasetDescriptor) pair.getRight();
                                try {
                                    flowEdge.getFlowTemplate().tryResolving(mergedConfig, (DatasetDescriptor) pair.getLeft(), (DatasetDescriptor) pair.getRight());
                                    if (datasetDescriptor3.contains(datasetDescriptor)) {
                                        FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, datasetDescriptor, makeOutputDescriptorSpecific(datasetDescriptor, datasetDescriptor4), mergedConfig, specExecutor);
                                        if (datasetDescriptor2.getFormatConfig().contains(datasetDescriptor4.getFormatConfig())) {
                                            linkedList.add(0, flowEdgeContext);
                                        } else {
                                            linkedList.add(flowEdgeContext);
                                        }
                                        z = true;
                                    }
                                } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
                                    this.flowSpec.getCompilationErrors().add(e.toString());
                                }
                            }
                            if (z) {
                                break;
                            }
                        }
                    }
                } catch (IOException | ReflectiveOperationException | SpecNotFoundException | JobTemplate.TemplateException e2) {
                    log.warn("Skipping edge {} with config {} due to exception: {}", new Object[]{flowEdge.getId(), this.flowConfig.toString(), e2});
                }
            }
        }
        return linkedList;
    }

    private DatasetDescriptor makeOutputDescriptorSpecific(DatasetDescriptor datasetDescriptor, DatasetDescriptor datasetDescriptor2) throws ReflectiveOperationException {
        Config rawConfig = datasetDescriptor2.getRawConfig();
        for (Map.Entry entry : datasetDescriptor.getRawConfig().entrySet()) {
            String obj = ((ConfigValue) entry.getValue()).unwrapped().toString();
            if (!isPlaceHolder(obj) && isPlaceHolder(ConfigUtils.getString(rawConfig, (String) entry.getKey(), ""))) {
                rawConfig = rawConfig.withValue((String) entry.getKey(), ConfigValueFactory.fromAnyRef(obj));
            }
        }
        return (DatasetDescriptor) GobblinConstructorUtils.invokeLongestConstructor(datasetDescriptor2.getClass(), new Object[]{rawConfig});
    }

    private boolean isPlaceHolder(String str) {
        return Strings.isNullOrEmpty(str) || str.equals(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
    }

    private Config getMergedConfig(FlowEdge flowEdge) {
        Config atPath = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
        return this.flowConfig.withFallback(flowEdge.getConfig()).withFallback(atPath).withFallback(this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) {
        LinkedList linkedList = new LinkedList();
        linkedList.add(flowEdgeContext);
        FlowEdgeContext flowEdgeContext2 = flowEdgeContext;
        while (true) {
            FlowEdgeContext flowEdgeContext3 = flowEdgeContext2;
            if (this.pathMap.get(flowEdgeContext3).equals(flowEdgeContext3)) {
                return linkedList;
            }
            linkedList.add(0, this.pathMap.get(flowEdgeContext3));
            flowEdgeContext2 = this.pathMap.get(flowEdgeContext3);
        }
    }

    @Override // org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder
    public FlowGraphPath findPath() throws PathFinder.PathFinderException {
        FlowGraphPath flowGraphPath = new FlowGraphPath(this.flowSpec, this.flowExecutionId);
        for (DataNode dataNode : this.destNodes) {
            List<FlowEdgeContext> findPathUnicast = findPathUnicast(dataNode);
            if (findPathUnicast == null) {
                log.error("Path to destination node {} could not be found for flow {}.", dataNode.getId(), this.flowSpec.getUri());
                return null;
            }
            log.info("Path to destination node {} found for flow {}. Path - {}", new Object[]{dataNode.getId(), this.flowSpec.getUri(), findPathUnicast});
            flowGraphPath.addPath(findPathUnicast);
        }
        return flowGraphPath;
    }

    public abstract List<FlowEdgeContext> findPathUnicast(DataNode dataNode) throws PathFinder.PathFinderException;
}
