package org.apache.gobblin.cluster;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/cluster/ScheduledJobConfigurationManager.class */
public class ScheduledJobConfigurationManager extends JobConfigurationManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledJobConfigurationManager.class);
    private static final long DEFAULT_JOB_SPEC_REFRESH_INTERVAL = 60;
    private Map<URI, JobSpec> jobSpecs;
    private final long refreshIntervalInSeconds;
    private final ScheduledExecutorService fetchJobSpecExecutor;
    protected final SpecConsumer _specConsumer;
    private final ClassAliasResolver<SpecConsumer> aliasResolver;

    public ScheduledJobConfigurationManager(EventBus eventBus, Config config) {
        super(eventBus, config);
        this.jobSpecs = Maps.newHashMap();
        this.refreshIntervalInSeconds = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, 60L).longValue();
        this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor")));
        this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
        try {
            String string = config.hasPath(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY) ? config.getString(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY) : GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS;
            LOGGER.info("Using SpecConsumer ClassNameclass name/alias " + string);
            this._specConsumer = (SpecConsumer) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(string)), new Object[]{config});
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.cluster.JobConfigurationManager
    public void startUp() throws Exception {
        LOGGER.info("Starting the " + ScheduledJobConfigurationManager.class.getSimpleName());
        LOGGER.info(String.format("Scheduling the job spec refresh task with an interval of %d second(s)", Long.valueOf(this.refreshIntervalInSeconds)));
        this.fetchJobSpecExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.gobblin.cluster.ScheduledJobConfigurationManager.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ScheduledJobConfigurationManager.this.fetchJobSpecs();
                } catch (InterruptedException | ExecutionException e) {
                    ScheduledJobConfigurationManager.LOGGER.error("Failed to fetch job specs", e);
                    throw new RuntimeException("Failed to fetch specs", e);
                }
            }
        }, 0L, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
    }

    protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
        for (Pair pair : (List) this._specConsumer.changedSpecs().get()) {
            SpecExecutor.Verb verb = (SpecExecutor.Verb) pair.getKey();
            if (verb.equals(SpecExecutor.Verb.ADD)) {
                JobSpec jobSpec = (JobSpec) pair.getValue();
                postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
                this.jobSpecs.put(((Spec) pair.getValue()).getUri(), (JobSpec) pair.getValue());
            } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
                JobSpec jobSpec2 = (JobSpec) pair.getValue();
                postUpdateJobConfigArrival(jobSpec2.getUri().toString(), jobSpec2.getConfigAsProperties());
                this.jobSpecs.put(((Spec) pair.getValue()).getUri(), (JobSpec) pair.getValue());
            } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
                postDeleteJobConfigArrival(((Spec) pair.getValue()).getUri().toString(), new Properties());
                this.jobSpecs.remove(((Spec) pair.getValue()).getUri());
            } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
                postCancelJobConfigArrival(((Spec) pair.getValue()).getUri().toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.cluster.JobConfigurationManager
    public void shutDown() throws Exception {
        ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER));
    }
}
