package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/Orchestrator.class */
public class Orchestrator implements SpecCatalogListener, Instrumentable {
    private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
    protected final Logger _log;
    protected final SpecCompiler specCompiler;
    protected final Optional<TopologyCatalog> topologyCatalog;
    protected final Optional<DagManager> dagManager;
    protected final MetricContext metricContext;
    protected final Optional<EventSubmitter> eventSubmitter;
    private final boolean flowConcurrencyFlag;
    private Optional<Meter> flowOrchestrationSuccessFulMeter;
    private Optional<Meter> flowOrchestrationFailedMeter;
    private Optional<Timer> flowOrchestrationTimer;
    private FlowStatusGenerator flowStatusGenerator;
    private final ClassAliasResolver<SpecCompiler> aliasResolver;
    private Map<String, FlowCompiledState> flowGauges;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/Orchestrator$CompiledState.class */
    public enum CompiledState {
        FAILED(-1),
        UNKNOWN(0),
        SUCCESSFUL(1),
        SKIPPED(2);

        public int value;

        CompiledState(int i) {
            this.value = i;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/Orchestrator$FlowCompiledState.class */
    private static class FlowCompiledState {
        private CompiledState state;

        private FlowCompiledState() {
            this.state = CompiledState.UNKNOWN;
        }

        public void setState(CompiledState compiledState) {
            this.state = compiledState;
        }
    }

    public Orchestrator(Config config, Optional<TopologyCatalog> optional, Optional<DagManager> optional2, Optional<Logger> optional3, boolean z) {
        this.flowGauges = Maps.newHashMap();
        this._log = optional3.isPresent() ? (Logger) optional3.get() : LoggerFactory.getLogger(getClass());
        this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
        this.topologyCatalog = optional;
        this.dagManager = optional2;
        try {
            String string = config.hasPath("gobblin.service.flowCompiler.class") ? config.getString("gobblin.service.flowCompiler.class") : "org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler";
            this._log.info("Using specCompiler class name/alias " + string);
            this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(string)), new Object[]{config});
            if (this.dagManager.isPresent()) {
                ((DagManager) this.dagManager.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
            }
            if (z) {
                this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
                this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter("GobblinService.flowOrchestration.successful"));
                this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter("GobblinService.flowOrchestration.failed"));
                this.flowOrchestrationTimer = Optional.of(this.metricContext.timer("GobblinService.flowOrchestration.time"));
                this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
            } else {
                this.metricContext = null;
                this.flowOrchestrationSuccessFulMeter = Optional.absent();
                this.flowOrchestrationFailedMeter = Optional.absent();
                this.flowOrchestrationTimer = Optional.absent();
                this.eventSubmitter = Optional.absent();
            }
            this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, "gobblin.service.flowConcurrencyAllowed", ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED.booleanValue());
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    public Orchestrator(Config config, Optional<TopologyCatalog> optional, Optional<DagManager> optional2, Optional<Logger> optional3) {
        this(config, optional, optional2, optional3, true);
    }

    public Orchestrator(Config config, Optional<TopologyCatalog> optional, Optional<DagManager> optional2, Logger logger) {
        this(config, optional, optional2, (Optional<Logger>) Optional.of(logger));
    }

    public Orchestrator(Config config, Logger logger) {
        this(config, (Optional<TopologyCatalog>) Optional.absent(), (Optional<DagManager>) Optional.absent(), (Optional<Logger>) Optional.of(logger));
    }

    public Orchestrator(Config config, Optional<TopologyCatalog> optional) {
        this(config, optional, (Optional<DagManager>) Optional.absent(), (Optional<Logger>) Optional.absent());
    }

    public Orchestrator(Config config) {
        this(config, (Optional<TopologyCatalog>) Optional.absent(), (Optional<DagManager>) Optional.absent(), (Optional<Logger>) Optional.absent());
    }

    @VisibleForTesting
    public SpecCompiler getSpecCompiler() {
        return this.specCompiler;
    }

    public AddSpecResponse onAddSpec(Spec spec) {
        if (spec instanceof TopologySpec) {
            this._log.info("New Spec detected of type TopologySpec: " + spec);
            this.specCompiler.onAddSpec(spec);
        }
        return new AddSpecResponse((Object) null);
    }

    public void onDeleteSpec(URI uri, String str) {
        onDeleteSpec(uri, str, new Properties());
    }

    public void onDeleteSpec(URI uri, String str, Properties properties) {
        this._log.info("Spec deletion detected: " + uri + "/" + str);
        if (this.topologyCatalog.isPresent()) {
            this.specCompiler.onDeleteSpec(uri, str, properties);
        }
    }

    public void onUpdateSpec(Spec spec) {
        this._log.info("Spec changed: " + spec);
        if (spec instanceof TopologySpec) {
            try {
                onDeleteSpec(spec.getUri(), spec.getVersion());
            } catch (Exception e) {
                this._log.error("Failed to update Spec: " + spec, e);
            }
            try {
                onAddSpec(spec);
            } catch (Exception e2) {
                this._log.error("Failed to update Spec: " + spec, e2);
            }
        }
    }

    public void orchestrate(Spec spec) throws Exception {
        String str;
        ((TopologyCatalog) this.topologyCatalog.get()).getInitComplete().await();
        getSpecCompiler().awaitHealthy();
        long nanoTime = System.nanoTime();
        if (!(spec instanceof FlowSpec)) {
            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
            throw new RuntimeException("Spec not of type FlowSpec, cannot orchestrate: " + spec);
        }
        Config config = ((FlowSpec) spec).getConfig();
        String string = config.getString("flow.group");
        String string2 = config.getString("flow.name");
        if (!this.flowGauges.containsKey(spec.getUri().toString())) {
            String name = MetricRegistry.name("GobblinService", new String[]{string, string2, "Compiled"});
            this.flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
            RootMetricContext.get().register(name, RootMetricContext.get().newContextAwareGauge(name, () -> {
                return Integer.valueOf(this.flowGauges.get(spec.getUri().toString()).state.value);
            }));
        }
        if (!canRun(string2, string, ConfigUtils.getBoolean(config, "flow.allowConcurrentExecution", this.flowConcurrencyFlag))) {
            this._log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since concurrent executions are disabled for this flow.", string, string2);
            this.flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
            Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
            flowMetadata.put("message", "Flow failed because another instance is running and concurrent executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour.");
            if (this.eventSubmitter.isPresent()) {
                ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("FlowFailed").stop(flowMetadata);
                return;
            }
            return;
        }
        TimingEvent timingEvent = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("FlowCompiled") : null;
        Dag<JobExecutionPlan> compileFlow = this.specCompiler.compileFlow(spec);
        Map<String, String> flowMetadata2 = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
        if (compileFlow == null || compileFlow.isEmpty()) {
            flowMetadata2.putIfAbsent("flowExecutionId", Long.toString(System.currentTimeMillis()));
            str = "Flow was not compiled successfully.";
            flowMetadata2.put("message", ((FlowSpec) spec).getCompilationErrors().isEmpty() ? "Flow was not compiled successfully." : str + " Compilation errors encountered: " + ((FlowSpec) spec).getCompilationErrors());
            TimingEvent timingEvent2 = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("FlowCompileFailed") : null;
            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
            this.flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
            this._log.warn("Cannot determine an executor to run on for Spec: " + spec);
            if (timingEvent2 != null) {
                timingEvent2.stop(flowMetadata2);
                return;
            }
            return;
        }
        this.flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
        flowMetadata2.putIfAbsent("flowExecutionId", compileFlow.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty("flow.executionId"));
        if (timingEvent != null) {
            timingEvent.stop(flowMetadata2);
        }
        if (this.dagManager.isPresent()) {
            ((DagManager) this.dagManager.get()).addDag(compileFlow, true);
        } else {
            for (Dag.DagNode<JobExecutionPlan> dagNode : compileFlow.getNodes()) {
                DagManagerUtils.incrementJobAttempt(dagNode);
                JobExecutionPlan value = dagNode.getValue();
                SpecProducer specProducer = null;
                try {
                    specProducer = (SpecProducer) value.getSpecExecutor().getProducer().get();
                    JobSpec jobSpec = value.getJobSpec();
                    if (!jobSpec.getConfig().hasPath("flow.executionId")) {
                        this._log.warn("JobSpec does not contain flowExecutionId.");
                    }
                    Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata2, value);
                    this._log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, specProducer));
                    TimingEvent timingEvent3 = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobOrchestrated") : null;
                    specProducer.addSpec(jobSpec);
                    if (timingEvent3 != null) {
                        timingEvent3.stop(jobMetadata);
                    }
                } catch (Exception e) {
                    this._log.error("Cannot successfully setup spec: " + value.getJobSpec() + " on executor: " + specProducer + " for flow: " + spec, e);
                }
            }
        }
        Instrumented.markMeter(this.flowOrchestrationSuccessFulMeter);
        Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
    }

    private boolean canRun(String str, String str2, boolean z) {
        return z || !this.flowStatusGenerator.isFlowRunning(str, str2);
    }

    public void remove(Spec spec, Properties properties) throws IOException {
        if (!(spec instanceof FlowSpec)) {
            throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
        }
        if (this.dagManager.isPresent()) {
            this._log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
            ((DagManager) this.dagManager.get()).stopDag(spec.getUri());
            return;
        }
        Dag<JobExecutionPlan> compileFlow = this.specCompiler.compileFlow(spec);
        if (compileFlow.isEmpty()) {
            this._log.warn("Cannot determine an executor to delete Spec: " + spec);
            return;
        }
        Iterator<Dag.DagNode<JobExecutionPlan>> it = compileFlow.getNodes().iterator();
        while (it.hasNext()) {
            JobExecutionPlan value = it.next().getValue();
            JobSpec jobSpec = value.getJobSpec();
            try {
                SpecProducer specProducer = (SpecProducer) value.getSpecExecutor().getProducer().get();
                this._log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, specProducer));
                specProducer.deleteSpec(jobSpec.getUri(), properties);
            } catch (Exception e) {
                this._log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
            }
        }
    }

    private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
        try {
            return FlowStatusGenerator.builder().jobStatusRetriever((JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName())), new Object[]{config})).build();
        } catch (ReflectiveOperationException e) {
            this._log.error("Exception encountered when instantiating JobStatusRetriever");
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    public boolean isInstrumentationEnabled() {
        return null != this.metricContext;
    }

    public List<Tag<?>> generateTags(State state) {
        return Collections.emptyList();
    }

    public void switchMetricContext(List<Tag<?>> list) {
        throw new UnsupportedOperationException();
    }

    public void switchMetricContext(MetricContext metricContext) {
        throw new UnsupportedOperationException();
    }

    public Optional<Meter> getFlowOrchestrationSuccessFulMeter() {
        return this.flowOrchestrationSuccessFulMeter;
    }

    public Optional<Meter> getFlowOrchestrationFailedMeter() {
        return this.flowOrchestrationFailedMeter;
    }

    public Optional<Timer> getFlowOrchestrationTimer() {
        return this.flowOrchestrationTimer;
    }

    public void setFlowStatusGenerator(FlowStatusGenerator flowStatusGenerator) {
        this.flowStatusGenerator = flowStatusGenerator;
    }
}
