package org.apache.gobblin.service;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/StreamingKafkaSpecConsumer.class */
public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable, StandardMetricsBridge {
    private static final Logger log = LoggerFactory.getLogger(StreamingKafkaSpecConsumer.class);
    private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
    private final AvroJobSpecKafkaJobMonitor _jobMonitor;
    private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue;
    private final MutableJobCatalog _jobCatalog;
    private final MetricContext _metricContext;
    private final Metrics _metrics;

    /* loaded from: input_file:org/apache/gobblin/service/StreamingKafkaSpecConsumer$JobSpecListener.class */
    protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
        public JobSpecListener() {
            super(StreamingKafkaSpecConsumer.log);
        }

        public void onAddJob(JobSpec jobSpec) {
            super.onAddJob(jobSpec);
            try {
                StreamingKafkaSpecConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutor.Verb.ADD, jobSpec));
                StreamingKafkaSpecConsumer.this._metrics.specConsumerJobSpecEnq.mark();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void onDeleteJob(URI uri, String str) {
            super.onDeleteJob(uri, str);
            try {
                JobSpec.Builder builder = JobSpec.builder(uri);
                builder.withVersion(str).withConfigAsProperties(new Properties());
                StreamingKafkaSpecConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutor.Verb.DELETE, builder.build()));
                StreamingKafkaSpecConsumer.this._metrics.specConsumerJobSpecEnq.mark();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void onCancelJob(URI uri) {
            super.onCancelJob(uri);
            try {
                JobSpec.Builder builder = JobSpec.builder(uri);
                builder.withConfigAsProperties(new Properties());
                StreamingKafkaSpecConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutor.Verb.CANCEL, builder.build()));
                StreamingKafkaSpecConsumer.this._metrics.specConsumerJobSpecEnq.mark();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void onUpdateJob(JobSpec jobSpec) {
            super.onUpdateJob(jobSpec);
            try {
                StreamingKafkaSpecConsumer.this._jobSpecQueue.put(new ImmutablePair(SpecExecutor.Verb.UPDATE, jobSpec));
                StreamingKafkaSpecConsumer.this._metrics.specConsumerJobSpecEnq.mark();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/service/StreamingKafkaSpecConsumer$Metrics.class */
    public class Metrics extends StandardMetricsBridge.StandardMetrics {
        private final ContextAwareMeter specConsumerJobSpecEnq;
        private final ContextAwareMeter specConsumerJobSpecDeq;
        public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = "specConsumerJobSpecQueueSize";
        public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
        public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";

        public Metrics(MetricContext metricContext) {
            List list = this.contextAwareMetrics;
            BlockingQueue blockingQueue = StreamingKafkaSpecConsumer.this._jobSpecQueue;
            blockingQueue.getClass();
            list.add(metricContext.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, blockingQueue::size));
            this.specConsumerJobSpecEnq = metricContext.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_ENQ);
            this.contextAwareMetrics.add(this.specConsumerJobSpecEnq);
            this.specConsumerJobSpecDeq = metricContext.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_DEQ);
            this.contextAwareMetrics.add(this.specConsumerJobSpecDeq);
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs());
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getUpdatedSpecs());
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs());
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getCancelledSpecs());
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getTotalSpecs());
            this.contextAwareMetrics.add(StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures());
        }
    }

    public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog mutableJobCatalog, Optional<Logger> optional) {
        try {
            this._jobMonitor = new AvroJobSpecKafkaJobMonitor.Factory().forConfig(config.withFallback(ConfigFactory.parseMap(ImmutableMap.of("topic", config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY), "jobSpecMonitor.kafka.auto.offset.reset", "smallest"))), mutableJobCatalog);
            this._jobCatalog = mutableJobCatalog;
            this._jobSpecQueue = new LinkedBlockingQueue(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", Integer.valueOf(DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)).intValue());
            this._metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), getClass());
            this._metrics = new Metrics(this._metricContext);
        } catch (IOException e) {
            throw new RuntimeException("Could not create job monitor", e);
        }
    }

    public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog mutableJobCatalog, Logger logger) {
        this(config, mutableJobCatalog, (Optional<Logger>) Optional.of(logger));
    }

    public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog mutableJobCatalog) {
        this(config, mutableJobCatalog, (Optional<Logger>) Optional.absent());
    }

    public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
        ArrayList arrayList = new ArrayList();
        try {
            Pair take = this._jobSpecQueue.take();
            this._metrics.specConsumerJobSpecDeq.mark();
            do {
                arrayList.add(take);
                take = (Pair) this._jobSpecQueue.poll();
            } while (take != null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new CompletedFuture(arrayList, (Throwable) null);
    }

    protected void startUp() {
        this._jobCatalog.addListener(new JobSpecListener());
        this._jobMonitor.startAsync().awaitRunning();
        addJobMonitorMetrics();
    }

    private void addJobMonitorMetrics() {
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getNewSpecs());
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getUpdatedSpecs());
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getRemovedSpecs());
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getCancelledSpecs());
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getTotalSpecs());
        this._metrics.getContextAwareMetrics().add(this._jobMonitor.getMessageParseFailures());
    }

    protected void shutDown() {
        this._jobMonitor.stopAsync().awaitTerminated();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        shutDown();
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        return ImmutableList.of(this._metrics);
    }

    public AvroJobSpecKafkaJobMonitor get_jobMonitor() {
        return this._jobMonitor;
    }
}
