package org.apache.gobblin.service.modules.flow;

import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowEdge;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.ServiceNode;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.policy.ServicePolicy;
import org.apache.gobblin.service.modules.utils.FindPathUtils;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.jgrapht.graph.DirectedWeightedMultigraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.class */
public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
    private static final Logger log = LoggerFactory.getLogger(MultiHopsFlowToJobSpecCompiler.class);
    private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
    private DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph;
    public ServicePolicy servicePolicy;
    private Optional<String> optionalUserSpecifiedPath;
    private FlowEdgeProps defaultFlowEdgeProps;

    public MultiHopsFlowToJobSpecCompiler(Config config) {
        this(config, Optional.absent(), true);
    }

    public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> optional) {
        this(config, optional, true);
    }

    public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> optional, boolean z) {
        super(config, optional, z);
        this.weightedGraph = new DirectedWeightedMultigraph<>(LoadBasedFlowEdgeImpl.class);
        this.defaultFlowEdgeProps = new FlowEdgeProps();
        String string = config.hasPath("gobblin.service.servicePolicy") ? config.getString("gobblin.service.servicePolicy") : "static";
        try {
            this.servicePolicy = (ServicePolicy) new ClassAliasResolver(ServicePolicy.class).resolveClass(string).newInstance();
            if (config.hasPath("gobblin.service.blockedConnections") && config.getStringList("gobblin.service.blockedConnections").size() > 0) {
                try {
                    for (String str : config.getStringList("gobblin.service.blockedConnections")) {
                        this.servicePolicy.addFlowEdge(new LoadBasedFlowEdgeImpl(new BaseServiceNodeImpl(str.split(":")[0]), new BaseServiceNodeImpl(str.split(":")[1]), InMemorySpecExecutor.createDummySpecExecutor(new URI(str.split(":")[2]))));
                    }
                } catch (URISyntaxException e) {
                    log.warn("Constructing of FlowEdge in ServicePolicy Failed");
                }
            }
            if (config.hasPath("gobblin.service.blockedNodes") && StringUtils.isNotBlank(config.getString("gobblin.service.blockedNodes"))) {
                Iterator it = SPLIT_BY_COMMA.splitToList(config.getString("gobblin.service.blockedNodes")).iterator();
                while (it.hasNext()) {
                    this.servicePolicy.addServiceNode(new BaseServiceNodeImpl((String) it.next()));
                }
            }
            if (config.hasPath("gobblin.service.fullDataPath") && StringUtils.isNotBlank(config.getString("gobblin.service.fullDataPath"))) {
                this.optionalUserSpecifiedPath = Optional.of(config.getString("gobblin.service.fullDataPath"));
            } else {
                this.optionalUserSpecifiedPath = Optional.absent();
            }
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e2) {
            throw new RuntimeException("Error happen when resolving class for :" + string, e2);
        }
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler
    public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        findPath(newLinkedHashMap, spec);
        return newLinkedHashMap;
    }

    private void inMemoryWeightGraphGenerator() {
        Iterator<TopologySpec> it = this.topologySpecMap.values().iterator();
        while (it.hasNext()) {
            weightGraphGenerateHelper(it.next());
        }
        this.servicePolicy.populateBlackListedEdges(this.weightedGraph);
        if (this.servicePolicy.getBlacklistedEdges().size() > 0) {
            Iterator<FlowEdge> it2 = this.servicePolicy.getBlacklistedEdges().iterator();
            while (it2.hasNext()) {
                this.weightedGraph.removeEdge(it2.next());
            }
        }
    }

    private void findPath(Map<Spec, SpecExecutor> map, Spec spec) {
        inMemoryWeightGraphGenerator();
        FlowSpec flowSpec = (FlowSpec) spec;
        if (this.optionalUserSpecifiedPath.isPresent()) {
            log.info("Starting to evaluate user's specified path ... ");
            if (userSpecifiedPathVerificator(map, flowSpec)) {
                log.info("User specified path[ " + ((String) this.optionalUserSpecifiedPath.get()) + "] successfully verified.");
                return;
            } else {
                log.error("Will not execute user specified path[ " + ((String) this.optionalUserSpecifiedPath.get()) + "]");
                log.info("Start to execute FlowCompiler's algorithm for valid data movement path");
            }
        }
        List<FlowEdge> dijkstraBasedPathFindingHelper = FindPathUtils.dijkstraBasedPathFindingHelper(new BaseServiceNodeImpl(flowSpec.getConfig().getString("gobblin.flow.sourceIdentifier")), new BaseServiceNodeImpl(flowSpec.getConfig().getString("gobblin.flow.destinationIdentifier")), this.weightedGraph);
        for (int i = 0; i < dijkstraBasedPathFindingHelper.size(); i++) {
            FlowEdge flowEdge = dijkstraBasedPathFindingHelper.get(i);
            map.put(convertHopToJobSpec(((LoadBasedFlowEdgeImpl) flowEdge).getSourceNode(), ((LoadBasedFlowEdgeImpl) flowEdge).getTargetNode(), flowSpec), ((LoadBasedFlowEdgeImpl) dijkstraBasedPathFindingHelper.get(i)).getSpecExecutorInstance());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler
    protected void populateEdgeTemplateMap() {
        if (this.templateCatalog.isPresent()) {
            Iterator it = this.weightedGraph.edgeSet().iterator();
            while (it.hasNext()) {
                this.edgeTemplateMap.put(((FlowEdge) it.next()).getEdgeIdentity(), ((FSJobCatalog) this.templateCatalog.get()).getAllTemplates().stream().map(jobTemplate -> {
                    return jobTemplate.getUri();
                }).collect(Collectors.toList()));
            }
        }
    }

    private boolean userSpecifiedPathVerificator(Map<Spec, SpecExecutor> map, FlowSpec flowSpec) {
        HashMap hashMap = new HashMap();
        List asList = Arrays.asList(((String) this.optionalUserSpecifiedPath.get()).split(","));
        for (int i = 0; i < asList.size() - 1; i++) {
            BaseServiceNodeImpl baseServiceNodeImpl = new BaseServiceNodeImpl((String) asList.get(i));
            BaseServiceNodeImpl baseServiceNodeImpl2 = new BaseServiceNodeImpl((String) asList.get(i + 1));
            if (!this.weightedGraph.containsVertex(baseServiceNodeImpl) || !this.weightedGraph.containsVertex(baseServiceNodeImpl2) || !this.weightedGraph.containsEdge(baseServiceNodeImpl, baseServiceNodeImpl2)) {
                log.error("User Specified Path is invalid");
                return false;
            }
            hashMap.put(convertHopToJobSpec(baseServiceNodeImpl, baseServiceNodeImpl2, flowSpec), ((LoadBasedFlowEdgeImpl) this.weightedGraph.getEdge(baseServiceNodeImpl, baseServiceNodeImpl2)).getSpecExecutorInstance());
        }
        map.putAll(hashMap);
        return true;
    }

    private void weightGraphGenerateHelper(TopologySpec topologySpec) {
        try {
            for (Map.Entry entry : ((Map) topologySpec.getSpecExecutor().getCapabilities().get()).entrySet()) {
                BaseServiceNodeImpl baseServiceNodeImpl = new BaseServiceNodeImpl(((ServiceNode) entry.getKey()).getNodeName());
                BaseServiceNodeImpl baseServiceNodeImpl2 = new BaseServiceNodeImpl(((ServiceNode) entry.getValue()).getNodeName());
                if (!this.weightedGraph.containsVertex(baseServiceNodeImpl)) {
                    this.weightedGraph.addVertex(baseServiceNodeImpl);
                }
                if (!this.weightedGraph.containsVertex(baseServiceNodeImpl2)) {
                    this.weightedGraph.addVertex(baseServiceNodeImpl2);
                }
                LoadBasedFlowEdgeImpl loadBasedFlowEdgeImpl = new LoadBasedFlowEdgeImpl(baseServiceNodeImpl, baseServiceNodeImpl2, this.defaultFlowEdgeProps, topologySpec.getSpecExecutor());
                if (!this.weightedGraph.containsEdge(loadBasedFlowEdgeImpl)) {
                    this.weightedGraph.addEdge(baseServiceNodeImpl, baseServiceNodeImpl2, loadBasedFlowEdgeImpl);
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            Instrumented.markMeter(this.flowCompilationFailedMeter);
            throw new RuntimeException("Cannot determine topology capabilities", e);
        }
    }

    private JobSpec buildJobSpec(ServiceNode serviceNode, ServiceNode serviceNode2, URI uri, FlowSpec flowSpec) {
        JobSpec build;
        JobSpec.Builder withVersion = JobSpec.builder(jobSpecURIGenerator(flowSpec, serviceNode, serviceNode2)).withConfig(flowSpec.getConfig()).withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
        if (uri != null) {
            withVersion.withTemplate(uri);
            try {
                build = new ResolvedJobSpec(withVersion.build(), (JobCatalog) this.templateCatalog.get());
                log.info("Resolved JobSpec properties are: " + build.getConfigAsProperties());
            } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
                throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
            }
        } else {
            build = withVersion.build();
            log.info("Unresolved JobSpec properties are: " + build.getConfigAsProperties());
        }
        build.setConfig(build.getConfig().withoutPath("job.schedule"));
        if (flowSpec.getConfig().hasPath("flow.name")) {
            build.setConfig(build.getConfig().withValue("job.name", ConfigValueFactory.fromAnyRef(flowSpec.getConfig().getValue("flow.name").unwrapped().toString() + "-" + serviceNode.getNodeName() + "-" + serviceNode2.getNodeName())));
        }
        if (flowSpec.getConfig().hasPath("flow.group")) {
            build.setConfig(build.getConfig().withValue("job.group", flowSpec.getConfig().getValue("flow.group")));
        }
        build.setConfig(build.getConfig().withValue("flow.executionId", ConfigValueFactory.fromAnyRef(Long.valueOf(System.currentTimeMillis()))));
        build.setConfigAsProperties(ConfigUtils.configToProperties(build.getConfig()));
        return build;
    }

    private JobSpec convertHopToJobSpec(ServiceNode serviceNode, ServiceNode serviceNode2, FlowSpec flowSpec) {
        return buildJobSpec(serviceNode, serviceNode2, getTemplateURI(serviceNode, serviceNode2, flowSpec, (FlowEdge) this.weightedGraph.getAllEdges(serviceNode, serviceNode2).iterator().next()), flowSpec);
    }

    private URI getTemplateURI(ServiceNode serviceNode, ServiceNode serviceNode2, FlowSpec flowSpec, FlowEdge flowEdge) {
        return (this.edgeTemplateMap == null || !this.edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? jobSpecTemplateURIGenerator(flowSpec) : this.edgeTemplateMap.get(flowEdge.getEdgeIdentity()).get(0);
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler
    public URI jobSpecURIGenerator(Object... objArr) {
        FlowSpec flowSpec = (FlowSpec) objArr[0];
        ServiceNode serviceNode = (ServiceNode) objArr[1];
        ServiceNode serviceNode2 = (ServiceNode) objArr[2];
        try {
            return new URI("gobblin-job", flowSpec.getUri().getAuthority(), StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/", new CharSequence[0]), "/", new CharSequence[0]) + serviceNode.getNodeName() + "-" + serviceNode2.getNodeName(), null);
        } catch (URISyntaxException e) {
            log.error("URI construction failed when jobSpec from " + serviceNode.getNodeName() + " to " + serviceNode2.getNodeName());
            throw new RuntimeException();
        }
    }

    public DirectedWeightedMultigraph<ServiceNode, FlowEdge> getWeightedGraph() {
        return this.weightedGraph;
    }
}
