package gobblin.service;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import gobblin.kafka.client.DecodeableKafkaRecord;
import gobblin.kafka.client.GobblinKafkaConsumerClient;
import gobblin.kafka.client.Kafka08ConsumerClient;
import gobblin.kafka.client.KafkaConsumerRecord;
import gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import gobblin.metrics.reporter.util.SchemaVersionWriter;
import gobblin.runtime.api.JobSpec;
import gobblin.runtime.api.Spec;
import gobblin.runtime.api.SpecExecutorInstance;
import gobblin.runtime.api.SpecExecutorInstanceConsumer;
import gobblin.runtime.job_spec.AvroJobSpec;
import gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import gobblin.source.extractor.extract.kafka.KafkaPartition;
import gobblin.source.extractor.extract.kafka.KafkaTopic;
import gobblin.util.CompletedFuture;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

/* loaded from: input_file:gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.class */
public class SimpleKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance implements SpecExecutorInstanceConsumer<Spec>, Closeable {
    protected final GobblinKafkaConsumerClient _kafka08Consumer;
    protected final List<KafkaPartition> _partitions;
    protected final List<Long> _lowWatermark;
    protected final List<Long> _nextWatermark;
    protected final List<Long> _highWatermark;
    private Iterator<KafkaConsumerRecord> messageIterator;
    private int currentPartitionIdx;
    private boolean isFirstRun;
    private final BinaryDecoder _decoder;
    private final SpecificDatumReader<AvroJobSpec> _reader;
    private final SchemaVersionWriter<?> _versionWriter;

    public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Optional<Logger> optional) {
        super(config, optional);
        this.messageIterator = null;
        this.currentPartitionIdx = -1;
        this.isFirstRun = true;
        this._kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config);
        this._partitions = ((KafkaTopic) this._kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST, Lists.newArrayList(new Pattern[]{Pattern.compile(config.getString(SimpleKafkaSpecExecutorInstance.SPEC_KAFKA_TOPICS_KEY))})).get(0)).getPartitions();
        this._lowWatermark = Lists.newArrayList(Collections.nCopies(this._partitions.size(), 0L));
        this._nextWatermark = Lists.newArrayList(Collections.nCopies(this._partitions.size(), 0L));
        this._highWatermark = Lists.newArrayList(Collections.nCopies(this._partitions.size(), 0L));
        this._decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(new byte[0]), (BinaryDecoder) null);
        this._reader = new SpecificDatumReader<>(AvroJobSpec.SCHEMA$);
        this._versionWriter = new FixedSchemaVersionWriter();
    }

    public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Logger logger) {
        this(config, (Optional<Logger>) Optional.of(logger));
    }

    public SimpleKafkaSpecExecutorInstanceConsumer(Config config) {
        this(config, (Optional<Logger>) Optional.absent());
    }

    public Future<? extends List<Pair<SpecExecutorInstance.Verb, Spec>>> changedSpecs() {
        AvroJobSpec decodeRecord;
        ArrayList arrayList = new ArrayList();
        initializeWatermarks();
        this.currentPartitionIdx = -1;
        loop0: while (!allPartitionsFinished()) {
            if (currentPartitionFinished()) {
                moveToNextPartition();
            } else {
                if (this.messageIterator == null || !this.messageIterator.hasNext()) {
                    try {
                        this.messageIterator = fetchNextMessageBuffer();
                        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
                            moveToNextPartition();
                        }
                    } catch (Exception e) {
                        this._log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", getCurrentPartition()), e);
                        moveToNextPartition();
                    }
                }
                while (!currentPartitionFinished() && this.messageIterator.hasNext()) {
                    DecodeableKafkaRecord decodeableKafkaRecord = (KafkaConsumerRecord) this.messageIterator.next();
                    if (decodeableKafkaRecord.getOffset() >= this._nextWatermark.get(this.currentPartitionIdx).longValue()) {
                        this._nextWatermark.set(this.currentPartitionIdx, Long.valueOf(decodeableKafkaRecord.getNextOffset()));
                        try {
                            if (!(decodeableKafkaRecord instanceof ByteArrayBasedKafkaRecord)) {
                                if (!(decodeableKafkaRecord instanceof DecodeableKafkaRecord)) {
                                    throw new IllegalStateException("Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord or DecodeableKafkaRecord");
                                    break loop0;
                                }
                                decodeRecord = (AvroJobSpec) decodeableKafkaRecord.getValue();
                            } else {
                                decodeRecord = decodeRecord((ByteArrayBasedKafkaRecord) decodeableKafkaRecord);
                            }
                            JobSpec.Builder builder = JobSpec.builder(decodeRecord.getUri());
                            Properties properties = new Properties();
                            properties.putAll(decodeRecord.getProperties());
                            builder.withJobCatalogURI(decodeRecord.getUri()).withVersion(decodeRecord.getVersion()).withDescription(decodeRecord.getDescription()).withConfigAsProperties(properties);
                            if (!decodeRecord.getTemplateUri().isEmpty()) {
                                builder.withTemplate(new URI(decodeRecord.getTemplateUri()));
                            }
                            arrayList.add(new ImmutablePair(SpecExecutorInstance.Verb.valueOf((String) decodeRecord.getMetadata().get("Verb")), builder.build()));
                        } catch (Throwable th) {
                            this._log.error("Could not decode record at partition " + this.currentPartitionIdx + " offset " + decodeableKafkaRecord.getOffset());
                        }
                    }
                }
            }
        }
        return new CompletedFuture(arrayList, (Throwable) null);
    }

    private void initializeWatermarks() {
        initializeLowWatermarks();
        initializeHighWatermarks();
    }

    private void initializeLowWatermarks() {
        try {
            int i = 0;
            for (KafkaPartition kafkaPartition : this._partitions) {
                if (this.isFirstRun) {
                    this._lowWatermark.set(i, Long.valueOf(this._kafka08Consumer.getEarliestOffset(kafkaPartition)));
                } else {
                    this._lowWatermark.set(i, this._highWatermark.get(i));
                }
                i++;
            }
            this.isFirstRun = false;
        } catch (KafkaOffsetRetrievalFailureException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void initializeHighWatermarks() {
        try {
            int i = 0;
            Iterator<KafkaPartition> it = this._partitions.iterator();
            while (it.hasNext()) {
                this._highWatermark.set(i, Long.valueOf(this._kafka08Consumer.getLatestOffset(it.next())));
                i++;
            }
        } catch (KafkaOffsetRetrievalFailureException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private boolean allPartitionsFinished() {
        return this.currentPartitionIdx >= this._nextWatermark.size();
    }

    private boolean currentPartitionFinished() {
        return this.currentPartitionIdx == -1 || this._nextWatermark.get(this.currentPartitionIdx).longValue() >= this._highWatermark.get(this.currentPartitionIdx).longValue();
    }

    private int moveToNextPartition() {
        this.messageIterator = null;
        int i = this.currentPartitionIdx;
        this.currentPartitionIdx = i + 1;
        return i;
    }

    private KafkaPartition getCurrentPartition() {
        return this._partitions.get(this.currentPartitionIdx);
    }

    private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
        return this._kafka08Consumer.consume(this._partitions.get(this.currentPartitionIdx), this._nextWatermark.get(this.currentPartitionIdx).longValue(), this._highWatermark.get(this.currentPartitionIdx).longValue());
    }

    private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord byteArrayBasedKafkaRecord) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayBasedKafkaRecord.getMessageBytes());
        this._versionWriter.readSchemaVersioningInformation(new DataInputStream(byteArrayInputStream));
        return (AvroJobSpec) this._reader.read((Object) null, DecoderFactory.get().binaryDecoder(byteArrayInputStream, this._decoder));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._kafka08Consumer.close();
    }
}
