package org.apache.gobblin.cluster;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/cluster/StreamingJobConfigurationManager.class */
public class StreamingJobConfigurationManager extends JobConfigurationManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJobConfigurationManager.class);
    private final ExecutorService fetchJobSpecExecutor;
    private final SpecConsumer specConsumer;
    private final long stopTimeoutSeconds;

    public StreamingJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog mutableJobCatalog) {
        super(eventBus, config);
        this.stopTimeoutSeconds = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.STOP_TIMEOUT_SECONDS, 60L).longValue();
        this.fetchJobSpecExecutor = Executors.newSingleThreadExecutor(ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor")));
        String string = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, GobblinClusterConfigurationKeys.DEFAULT_STREAMING_SPEC_CONSUMER_CLASS);
        LOGGER.info("Using SpecConsumer ClassNameclass name/alias " + string);
        try {
            this.specConsumer = (SpecConsumer) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(new ClassAliasResolver(SpecConsumer.class).resolve(string)), new List[]{ImmutableList.of(config, mutableJobCatalog), ImmutableList.of(config)});
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Could not construct SpecConsumer " + string, e);
        }
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        return this.specConsumer instanceof StandardMetricsBridge ? this.specConsumer.getStandardMetricsCollection() : ImmutableList.of();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.cluster.JobConfigurationManager
    public void startUp() throws Exception {
        LOGGER.info("Starting the " + StreamingJobConfigurationManager.class.getSimpleName());
        this.fetchJobSpecExecutor.execute(new Runnable() { // from class: org.apache.gobblin.cluster.StreamingJobConfigurationManager.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        StreamingJobConfigurationManager.this.fetchJobSpecs();
                    } catch (InterruptedException e) {
                        StreamingJobConfigurationManager.LOGGER.info("Fetch thread interrupted... will exit");
                        return;
                    } catch (ExecutionException e2) {
                        StreamingJobConfigurationManager.LOGGER.error("Failed to fetch job specs", e2);
                        throw new RuntimeException("Failed to fetch specs", e2);
                    }
                }
            }
        });
        if (this.specConsumer instanceof Service) {
            this.specConsumer.startAsync().awaitRunning();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchJobSpecs() throws ExecutionException, InterruptedException {
        List<Pair> list = (List) this.specConsumer.changedSpecs().get();
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        for (Pair pair : list) {
            SpecExecutor.Verb verb = (SpecExecutor.Verb) pair.getKey();
            if (verb.equals(SpecExecutor.Verb.ADD)) {
                JobSpec jobSpec = (JobSpec) pair.getValue();
                postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
            } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
                JobSpec jobSpec2 = (JobSpec) pair.getValue();
                postUpdateJobConfigArrival(jobSpec2.getUri().toString(), jobSpec2.getConfigAsProperties());
            } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
                postDeleteJobConfigArrival(((Spec) pair.getValue()).getUri().toString(), new Properties());
            } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
                postCancelJobConfigArrival(((Spec) pair.getValue()).getUri().toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.cluster.JobConfigurationManager
    public void shutDown() throws Exception {
        if (this.specConsumer instanceof Service) {
            this.specConsumer.stopAsync().awaitTerminated(this.stopTimeoutSeconds, TimeUnit.SECONDS);
        }
        ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER));
    }

    public SpecConsumer getSpecConsumer() {
        return this.specConsumer;
    }
}
