package org.apache.gobblin.service.modules.flow;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.class */
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
    private static final Logger log = LoggerFactory.getLogger(MultiHopFlowCompiler.class);
    private final FlowGraph flowGraph;
    private ServiceManager serviceManager;
    private CountDownLatch initComplete;
    private GitFlowGraphMonitor gitFlowGraphMonitor;
    private ReadWriteLock rwLock;
    private DataMovementAuthorizer dataMovementAuthorizer;

    public MultiHopFlowCompiler(Config config) {
        this(config, true);
    }

    public MultiHopFlowCompiler(Config config, boolean z) {
        this(config, Optional.absent(), z);
    }

    public MultiHopFlowCompiler(Config config, Optional<Logger> optional) {
        this(config, optional, true);
    }

    public MultiHopFlowCompiler(Config config, Optional<Logger> optional, boolean z) {
        super(config, optional, z);
        this.initComplete = new CountDownLatch(1);
        this.rwLock = new ReentrantReadWriteLock(true);
        this.flowGraph = new BaseFlowGraph();
        Optional.absent();
        if (config.hasPath("gobblin.service.templateCatalogs.fullyQualifiedPath") && StringUtils.isNotBlank(config.getString("gobblin.service.templateCatalogs.fullyQualifiedPath"))) {
            try {
                Optional of = Optional.of(new ObservingFSFlowEdgeTemplateCatalog(config, this.rwLock));
                Config withValue = this.config.hasPath("encrypt.key.loc") ? this.config.withValue("gobblin.service.gitFlowGraphMonitor.encrypt.key.loc", config.getValue("encrypt.key.loc")) : this.config;
                try {
                    this.dataMovementAuthorizer = (DataMovementAuthorizer) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver(DataMovementAuthorizer.class).resolve(ConfigUtils.getString(this.config, "dataMovementAuthorizer.class", NoopDataMovementAuthorizer.class.getCanonicalName()))), new Object[]{this.config});
                    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(withValue, of, this.flowGraph, this.topologySpecMap, getInitComplete());
                    this.serviceManager = new ServiceManager(Lists.newArrayList(new AbstractIdleService[]{this.gitFlowGraphMonitor, (AbstractIdleService) of.get()}));
                    addShutdownHook();
                    try {
                        this.serviceManager.startAsync().awaitHealthy(5L, TimeUnit.SECONDS);
                    } catch (TimeoutException e) {
                        log.error("Timed out while waiting for the service manager to start up", e);
                        throw new RuntimeException(e);
                    }
                } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e2) {
                    throw new RuntimeException(e2);
                }
            } catch (IOException e3) {
                throw new RuntimeException("Cannot instantiate " + getClass().getName(), e3);
            }
        }
    }

    @VisibleForTesting
    MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
        super(config, Optional.absent(), true);
        this.initComplete = new CountDownLatch(1);
        this.rwLock = new ReentrantReadWriteLock(true);
        this.flowGraph = flowGraph;
        this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler, org.apache.gobblin.service.modules.flow.SpecCompiler
    public void setActive(boolean z) {
        super.setActive(z);
        if (this.gitFlowGraphMonitor != null) {
            this.gitFlowGraphMonitor.setActive(z);
        }
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler, org.apache.gobblin.service.modules.flow.SpecCompiler
    public void awaitHealthy() throws InterruptedException {
        if (getInitComplete().getCount() > 0) {
            log.info("Waiting for the MultiHopFlowCompiler to become healthy..");
            getInitComplete().await();
            log.info("The MultihopFlowCompiler is healthy and ready to orchestrate flows.");
        }
    }

    @Override // org.apache.gobblin.service.modules.flow.BaseFlowToJobSpecCompiler, org.apache.gobblin.service.modules.flow.SpecCompiler
    public Dag<JobExecutionPlan> compileFlow(Spec spec) {
        Preconditions.checkNotNull(spec);
        Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
        long nanoTime = System.nanoTime();
        FlowSpec flowSpec = (FlowSpec) spec;
        String string = ConfigUtils.getString(flowSpec.getConfig(), "gobblin.flow.sourceIdentifier", "");
        String string2 = ConfigUtils.getString(flowSpec.getConfig(), "gobblin.flow.destinationIdentifier", "");
        DataNode node = this.flowGraph.getNode(string);
        Stream stream = ConfigUtils.getStringList(flowSpec.getConfig(), "gobblin.flow.destinationIdentifier").stream();
        FlowGraph flowGraph = this.flowGraph;
        flowGraph.getClass();
        List<DataNode> list = (List) stream.map(flowGraph::getNode).collect(Collectors.toList());
        log.info(String.format("Compiling flow for source: %s and destination: %s", string, string2));
        List<FlowSpec> splitFlowSpec = splitFlowSpec(flowSpec);
        Dag<JobExecutionPlan> dag = new Dag<>(new ArrayList());
        try {
            try {
                this.rwLock.readLock().lock();
                for (FlowSpec flowSpec2 : splitFlowSpec) {
                    for (DataNode dataNode : list) {
                        long nanoTime2 = System.nanoTime();
                        boolean isMovementAuthorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, node, dataNode);
                        Instrumented.updateTimer(this.dataAuthorizationTimer, System.nanoTime() - nanoTime2, TimeUnit.NANOSECONDS);
                        if (!isMovementAuthorized) {
                            String format = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s", flowSpec.getUri().toString(), string, string2);
                            log.error(format);
                            flowSpec2.getCompilationErrors().add(format);
                            this.rwLock.readLock().unlock();
                            return null;
                        }
                    }
                    FlowGraphPath findPath = this.flowGraph.findPath(flowSpec2);
                    if (findPath != null) {
                        dag = dag.merge(findPath.asDag(this.config));
                    }
                }
                if (!dag.isEmpty()) {
                    this.rwLock.readLock().unlock();
                    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
                    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    return dag;
                }
                Instrumented.markMeter(this.flowCompilationFailedMeter);
                log.info(String.format("No path found from source: %s and destination: %s", string, string2));
                Dag<JobExecutionPlan> dag2 = dag;
                this.rwLock.readLock().unlock();
                return dag2;
            } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | ReflectiveOperationException | URISyntaxException e) {
                Instrumented.markMeter(this.flowCompilationFailedMeter);
                log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", string, string2), e);
                this.rwLock.readLock().unlock();
                return null;
            }
        } catch (Throwable th) {
            this.rwLock.readLock().unlock();
            throw th;
        }
    }

    private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
        long orCreateFlowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
        ArrayList arrayList = new ArrayList();
        Config config = flowSpec.getConfig();
        if (config.hasPath("gobblin.flow.dataset.subPaths")) {
            List<String> stringList = ConfigUtils.getStringList(config, "gobblin.flow.dataset.subPaths");
            String string = ConfigUtils.getString(config, "gobblin.flow.dataset.baseInputPath", "/");
            String string2 = ConfigUtils.getString(config, "gobblin.flow.dataset.baseOutputPath", "/");
            if (ConfigUtils.getBoolean(config, "gobblin.flow.dataset.combine", false)) {
                arrayList.add(copyFlowSpecWithNewConfig(flowSpec, config.withoutPath("gobblin.flow.dataset.subPaths").withValue("gobblin.flow.input.dataset.descriptor.path", ConfigValueFactory.fromAnyRef(string)).withValue("gobblin.flow.output.dataset.descriptor.path", ConfigValueFactory.fromAnyRef(string2)).withValue("gobblin.flow.input.dataset.descriptor.subPaths", ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(stringList))).withValue("gobblin.flow.output.dataset.descriptor.subPaths", ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(stringList)))));
            } else {
                for (String str : stringList) {
                    arrayList.add(copyFlowSpecWithNewConfig(flowSpec, config.withoutPath("gobblin.flow.dataset.subPaths").withValue("flow.executionId", ConfigValueFactory.fromAnyRef(Long.valueOf(orCreateFlowExecutionId))).withValue("gobblin.flow.input.dataset.descriptor.path", ConfigValueFactory.fromAnyRef(new Path(string, str).toString())).withValue("gobblin.flow.output.dataset.descriptor.path", ConfigValueFactory.fromAnyRef(new Path(string2, str).toString()))));
                }
            }
        } else {
            arrayList.add(flowSpec);
        }
        return arrayList;
    }

    private static String convertStringListToGlobPattern(List<String> list) {
        return "{" + Joiner.on(",").join(list) + "}";
    }

    private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config config) {
        FlowSpec.Builder withConfig = FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion()).withDescription(flowSpec.getDescription()).withConfig(config);
        if (flowSpec.getTemplateURIs().isPresent()) {
            withConfig = withConfig.withTemplates((Collection) flowSpec.getTemplateURIs().get());
        }
        if (flowSpec.getChildSpecs().isPresent()) {
            withConfig = withConfig.withTemplates((Collection) flowSpec.getChildSpecs().get());
        }
        return withConfig.build();
    }

    private void addShutdownHook() {
        final ServiceManager serviceManager = this.serviceManager;
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                }
            }
        });
    }

    public FlowGraph getFlowGraph() {
        return this.flowGraph;
    }

    public ServiceManager getServiceManager() {
        return this.serviceManager;
    }

    public CountDownLatch getInitComplete() {
        return this.initComplete;
    }
}
