package gobblin.service;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.runtime.api.JobSpec;
import gobblin.runtime.api.MutableJobCatalog;
import gobblin.runtime.api.Spec;
import gobblin.runtime.api.SpecExecutorInstance;
import gobblin.runtime.api.SpecExecutorInstanceConsumer;
import gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
import gobblin.runtime.std.DefaultJobCatalogListenerImpl;
import gobblin.util.CompletedFuture;
import gobblin.util.ConfigUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

/* loaded from: input_file:gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.class */
public class StreamingKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance implements SpecExecutorInstanceConsumer<Spec>, Closeable {
    public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
    private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
    private final AvroJobSpecKafkaJobMonitor _jobMonitor;
    private final BlockingQueue<ImmutablePair<SpecExecutorInstance.Verb, Spec>> _jobSpecQueue;

    /* loaded from: input_file:gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer$JobSpecListener.class */
    protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
        public JobSpecListener() {
            super(StreamingKafkaSpecExecutorInstanceConsumer.this._log);
        }

        public void onAddJob(JobSpec jobSpec) {
            super.onAddJob(jobSpec);
            try {
                StreamingKafkaSpecExecutorInstanceConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutorInstance.Verb.ADD, jobSpec));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void onDeleteJob(URI uri, String str) {
            super.onDeleteJob(uri, str);
            try {
                JobSpec.Builder builder = JobSpec.builder(uri);
                builder.withVersion(str).withConfigAsProperties(new Properties());
                StreamingKafkaSpecExecutorInstanceConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutorInstance.Verb.DELETE, builder.build()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void onUpdateJob(JobSpec jobSpec) {
            super.onUpdateJob(jobSpec);
            try {
                StreamingKafkaSpecExecutorInstanceConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutorInstance.Verb.UPDATE, jobSpec));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog mutableJobCatalog, Optional<Logger> optional) {
        super(config, optional);
        try {
            this._jobMonitor = new AvroJobSpecKafkaJobMonitor.Factory().forConfig(config.withFallback(ConfigFactory.parseMap(ImmutableMap.of("topic", config.getString(SimpleKafkaSpecExecutorInstance.SPEC_KAFKA_TOPICS_KEY), "jobSpecMonitor.kafka.auto.offset.reset", "smallest"))), mutableJobCatalog);
            this._jobSpecQueue = new LinkedBlockingQueue(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", Integer.valueOf(DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)).intValue());
            mutableJobCatalog.addListener(new JobSpecListener());
        } catch (IOException e) {
            throw new RuntimeException("Could not create job monitor", e);
        }
    }

    public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog mutableJobCatalog, Logger logger) {
        this(config, mutableJobCatalog, (Optional<Logger>) Optional.of(logger));
    }

    public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog mutableJobCatalog) {
        this(config, mutableJobCatalog, (Optional<Logger>) Optional.absent());
    }

    public Future<? extends List<Pair<SpecExecutorInstance.Verb, Spec>>> changedSpecs() {
        ArrayList arrayList = new ArrayList();
        try {
            Pair take = this._jobSpecQueue.take();
            do {
                arrayList.add(take);
                take = (Pair) this._jobSpecQueue.poll();
            } while (take != null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new CompletedFuture(arrayList, (Throwable) null);
    }

    @Override // gobblin.service.SimpleKafkaSpecExecutorInstance
    protected void startUp() {
        this._jobMonitor.startAsync().awaitRunning();
    }

    @Override // gobblin.service.SimpleKafkaSpecExecutorInstance
    protected void shutDown() {
        this._jobMonitor.stopAsync().awaitTerminated();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        shutDown();
    }
}
