package gobblin.runtime.job_monitor;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import gobblin.runtime.api.JobSpec;
import gobblin.runtime.api.JobSpecMonitor;
import gobblin.runtime.api.MutableJobCatalog;
import gobblin.runtime.kafka.HighLevelConsumer;
import gobblin.runtime.metrics.RuntimeMetrics;
import gobblin.util.ConfigUtils;
import gobblin.util.Either;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/job_monitor/KafkaJobMonitor.class */
public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> implements JobSpecMonitor {
    private static final Logger log = LoggerFactory.getLogger(KafkaJobMonitor.class);
    public static final String KAFKA_JOB_MONITOR_PREFIX = "jobSpecMonitor.kafka";
    public static final String KAFKA_AUTO_OFFSET_RESET_KEY = "jobSpecMonitor.kafka.auto.offset.reset";
    public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
    public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
    private final MutableJobCatalog jobCatalog;
    private Counter newSpecs;
    private Counter remmovedSpecs;

    public abstract Collection<Either<JobSpec, URI>> parseJobSpec(byte[] bArr) throws IOException;

    public KafkaJobMonitor(String str, MutableJobCatalog mutableJobCatalog, Config config) {
        super(str, ConfigUtils.getConfigOrEmpty(config, KAFKA_JOB_MONITOR_PREFIX), 1);
        this.jobCatalog = mutableJobCatalog;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.runtime.kafka.HighLevelConsumer
    public void createMetrics() {
        super.createMetrics();
        this.newSpecs = getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS);
        this.remmovedSpecs = getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.runtime.kafka.HighLevelConsumer
    @VisibleForTesting
    public void buildMetricsContextAndMetrics() {
        super.buildMetricsContextAndMetrics();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.runtime.kafka.HighLevelConsumer
    @VisibleForTesting
    public void shutdownMetrics() throws IOException {
        super.shutdownMetrics();
    }

    @Override // gobblin.runtime.kafka.HighLevelConsumer
    protected void processMessage(MessageAndMetadata<byte[], byte[]> messageAndMetadata) {
        try {
            Iterator<Either<JobSpec, URI>> it = parseJobSpec((byte[]) messageAndMetadata.message()).iterator();
            while (it.hasNext()) {
                Either.Left left = (Either) it.next();
                if (left instanceof Either.Left) {
                    this.newSpecs.inc();
                    this.jobCatalog.put((JobSpec) left.getLeft());
                } else if (left instanceof Either.Right) {
                    this.remmovedSpecs.inc();
                    this.jobCatalog.remove((URI) ((Either.Right) left).getRight());
                }
            }
        } catch (IOException e) {
            log.error(String.format("Failed to parse kafka message with offset %d: %s.", Long.valueOf(messageAndMetadata.offset()), new String((byte[]) messageAndMetadata.message(), Charsets.UTF_8)), e);
        }
    }
}
