package org.apache.gobblin.service.modules.flow;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.class */
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
    private static final Logger log = LoggerFactory.getLogger(MultiHopFlowCompiler.class);
    private final FlowGraph flowGraph;
    private ServiceManager serviceManager;
    private boolean active;
    private GitFlowGraphMonitor gitFlowGraphMonitor;

    public MultiHopFlowCompiler(Config config) {
        this(config, true);
    }

    public MultiHopFlowCompiler(Config config, boolean z) {
        this(config, Optional.absent(), z);
    }

    public MultiHopFlowCompiler(Config config, Optional<Logger> optional) {
        this(config, optional, true);
    }

    public MultiHopFlowCompiler(Config config, Optional<Logger> optional, boolean z) {
        super(config, optional, z);
        try {
            FSFlowCatalog fSFlowCatalog = new FSFlowCatalog(config.withValue("jobconf.fullyQualifiedPath", config.getValue("gobblin.service.templateCatalogs.fullyQualifiedPath")));
            this.flowGraph = new BaseFlowGraph();
            this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, fSFlowCatalog, this.flowGraph);
            this.serviceManager = new ServiceManager(Lists.newArrayList(new GitFlowGraphMonitor[]{this.gitFlowGraphMonitor}));
            addShutdownHook();
            try {
                this.serviceManager.startAsync().awaitHealthy(5L, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                log.error("Timed out while waiting for the service manager to start up", e);
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException("Cannot instantiate " + getClass().getName(), e2);
        }
    }

    @VisibleForTesting
    MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
        super(config, Optional.absent(), true);
        this.flowGraph = flowGraph;
    }

    public void setActive(boolean z) {
        this.active = z;
        this.gitFlowGraphMonitor.setActive(z);
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler, org.apache.gobblin.service.modules.flow.SpecCompiler
    public Dag<JobExecutionPlan> compileFlow(Spec spec) {
        Preconditions.checkNotNull(spec);
        Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
        long nanoTime = System.nanoTime();
        FlowSpec flowSpec = (FlowSpec) spec;
        String string = ConfigUtils.getString(flowSpec.getConfig(), "gobblin.flow.sourceIdentifier", "");
        String string2 = ConfigUtils.getString(flowSpec.getConfig(), "gobblin.flow.destinationIdentifier", "");
        log.info(String.format("Compiling flow for source: %s and destination: %s", string, string2));
        try {
            FlowGraphPath findPath = this.flowGraph.findPath(flowSpec);
            if (findPath == null) {
                Instrumented.markMeter(this.flowCompilationFailedMeter);
                log.info(String.format("No path found from source: %s and destination: %s", string, string2));
                return new JobExecutionPlanDagFactory().createDag(new ArrayList());
            }
            Dag<JobExecutionPlan> asDag = findPath.asDag();
            Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
            Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            return asDag;
        } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | ReflectiveOperationException | URISyntaxException e) {
            Instrumented.markMeter(this.flowCompilationFailedMeter);
            log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", string, string2), e);
            return null;
        }
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler
    protected void populateEdgeTemplateMap() {
        log.warn("No population of templates based on edge happen in this implementation");
    }

    private void addShutdownHook() {
        final ServiceManager serviceManager = this.serviceManager;
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                }
            }
        });
    }

    public FlowGraph getFlowGraph() {
        return this.flowGraph;
    }

    public ServiceManager getServiceManager() {
        return this.serviceManager;
    }

    public boolean isActive() {
        return this.active;
    }
}
