package org.apache.gobblin.source.extractor.extract.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WatermarkStorage;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingExtractor.class */
public class KafkaSimpleStreamingExtractor<S, D> extends EventBasedExtractor<S, D> implements StreamingExtractor<S, D> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSimpleStreamingExtractor.class);
    private AtomicBoolean _isStarted;
    private final Consumer<S, D> _consumer;
    private final TopicPartition _partition;
    private Iterator<ConsumerRecord<S, D>> _records;
    AtomicLong _rowCount;
    protected final Optional<KafkaSchemaRegistry<String, S>> _schemaRegistry;
    protected AtomicBoolean _close;
    private final long fetchTimeOut;

    /* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingExtractor$KafkaWatermark.class */
    public static class KafkaWatermark implements CheckpointableWatermark {
        TopicPartition _topicPartition;
        LongWatermark _lwm;

        @VisibleForTesting
        public KafkaWatermark(TopicPartition topicPartition, LongWatermark longWatermark) {
            this._topicPartition = topicPartition;
            this._lwm = longWatermark;
        }

        public String getSource() {
            return this._topicPartition.toString();
        }

        public ComparableWatermark getWatermark() {
            return this._lwm;
        }

        public short calculatePercentCompletion(Watermark watermark, Watermark watermark2) {
            return (short) 0;
        }

        public JsonElement toJson() {
            return WatermarkSerializerHelper.convertWatermarkToJson(this);
        }

        public int compareTo(CheckpointableWatermark checkpointableWatermark) {
            Preconditions.checkArgument(checkpointableWatermark instanceof KafkaWatermark);
            KafkaWatermark kafkaWatermark = (KafkaWatermark) checkpointableWatermark;
            Preconditions.checkArgument(this._topicPartition.equals(kafkaWatermark._topicPartition));
            return this._lwm.compareTo(kafkaWatermark._lwm);
        }

        public boolean equals(Object obj) {
            return obj != null && (obj instanceof KafkaWatermark) && compareTo((CheckpointableWatermark) obj) == 0;
        }

        public int hashCode() {
            return (this._topicPartition.hashCode() * 31) + this._lwm.hashCode();
        }

        public TopicPartition getTopicPartition() {
            return this._topicPartition;
        }

        public LongWatermark getLwm() {
            return this._lwm;
        }

        public String toString() {
            return "KafkaSimpleStreamingExtractor.KafkaWatermark(_topicPartition=" + this._topicPartition + ", _lwm=" + this._lwm + ")";
        }
    }

    public void start(WatermarkStorage watermarkStorage) throws IOException {
        Preconditions.checkArgument(watermarkStorage != null, "Watermark Storage should not be null");
        KafkaWatermark kafkaWatermark = (KafkaWatermark) watermarkStorage.getCommittedWatermarks(KafkaWatermark.class, Collections.singletonList(this._partition.toString())).get(this._partition.toString());
        if (kafkaWatermark == null) {
            LOG.info("Offset is null - seeking to beginning of topic and partition for {} ", this._partition.toString());
            this._consumer.seekToBeginning(new TopicPartition[]{this._partition});
        } else {
            LOG.info("Offset found in consumer for partition {}. Seeking to one past what we found : {}", this._partition.toString(), Long.valueOf(kafkaWatermark.getLwm().getValue() + 1));
            this._consumer.seek(this._partition, kafkaWatermark.getLwm().getValue() + 1);
        }
        this._isStarted.set(true);
    }

    public KafkaSimpleStreamingExtractor(WorkUnitState workUnitState) {
        super(workUnitState);
        this._isStarted = new AtomicBoolean(false);
        this._rowCount = new AtomicLong(0L);
        this._close = new AtomicBoolean(false);
        this._consumer = KafkaSimpleStreamingSource.getKafkaConsumer(ConfigUtils.propertiesToConfig(workUnitState.getProperties()));
        this.closer.register(this._consumer);
        this._partition = new TopicPartition(KafkaSimpleStreamingSource.getTopicNameFromState(workUnitState), KafkaSimpleStreamingSource.getPartitionIdFromState(workUnitState));
        this._consumer.assign(Collections.singletonList(this._partition));
        this._schemaRegistry = workUnitState.contains("kafka.schema.registry.class") ? Optional.of(KafkaSchemaRegistry.get(workUnitState.getProperties())) : Optional.absent();
        this.fetchTimeOut = workUnitState.getPropAsLong("source.kafka.fetchTimeoutMillis", 1000L);
    }

    public S getSchema() throws IOException {
        try {
            return this._schemaRegistry.isPresent() ? (S) ((KafkaSchemaRegistry) this._schemaRegistry.get()).getLatestSchemaByTopic(this._partition.topic()) : (S) this._partition.topic();
        } catch (SchemaRegistryException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public List<Tag<?>> generateTags(State state) {
        List<Tag<?>> generateTags = super.generateTags(state);
        generateTags.add(new Tag<>("kafkaTopic", state.getProp(KafkaSimpleStreamingSource.TOPIC_WHITELIST)));
        return generateTags;
    }

    /* JADX WARN: Code restructure failed: missing block: B:23:0x003f, code lost:
    
        throw new java.nio.channels.ClosedChannelException();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.gobblin.stream.RecordEnvelope<D> readRecordEnvelopeImpl() throws org.apache.gobblin.source.extractor.DataRecordException, java.io.IOException {
        /*
            r11 = this;
            r0 = r11
            java.util.concurrent.atomic.AtomicBoolean r0 = r0._isStarted
            boolean r0 = r0.get()
            if (r0 != 0) goto L14
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            java.lang.String r2 = "Streaming extractor has not been started."
            r1.<init>(r2)
            throw r0
        L14:
            r0 = r11
            java.util.Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<S, D>> r0 = r0._records
            if (r0 == 0) goto L27
            r0 = r11
            java.util.Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<S, D>> r0 = r0._records
            boolean r0 = r0.hasNext()
            if (r0 != 0) goto L61
        L27:
            r0 = r11
            org.apache.kafka.clients.consumer.Consumer<S, D> r0 = r0._consumer
            r1 = r0
            r12 = r1
            monitor-enter(r0)
            r0 = r11
            java.util.concurrent.atomic.AtomicBoolean r0 = r0._close     // Catch: java.lang.Throwable -> L59
            boolean r0 = r0.get()     // Catch: java.lang.Throwable -> L59
            if (r0 == 0) goto L40
            java.nio.channels.ClosedChannelException r0 = new java.nio.channels.ClosedChannelException     // Catch: java.lang.Throwable -> L59
            r1 = r0
            r1.<init>()     // Catch: java.lang.Throwable -> L59
            throw r0     // Catch: java.lang.Throwable -> L59
        L40:
            r0 = r11
            r1 = r11
            org.apache.kafka.clients.consumer.Consumer<S, D> r1 = r1._consumer     // Catch: java.lang.Throwable -> L59
            r2 = r11
            long r2 = r2.fetchTimeOut     // Catch: java.lang.Throwable -> L59
            org.apache.kafka.clients.consumer.ConsumerRecords r1 = r1.poll(r2)     // Catch: java.lang.Throwable -> L59
            java.util.Iterator r1 = r1.iterator()     // Catch: java.lang.Throwable -> L59
            r0._records = r1     // Catch: java.lang.Throwable -> L59
            r0 = r12
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L59
            goto L5e
        L59:
            r13 = move-exception
            r0 = r12
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L59
            r0 = r13
            throw r0
        L5e:
            goto L14
        L61:
            r0 = r11
            java.util.Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<S, D>> r0 = r0._records
            java.lang.Object r0 = r0.next()
            org.apache.kafka.clients.consumer.ConsumerRecord r0 = (org.apache.kafka.clients.consumer.ConsumerRecord) r0
            r12 = r0
            r0 = r11
            java.util.concurrent.atomic.AtomicLong r0 = r0._rowCount
            long r0 = r0.getAndIncrement()
            org.apache.gobblin.stream.RecordEnvelope r0 = new org.apache.gobblin.stream.RecordEnvelope
            r1 = r0
            r2 = r12
            java.lang.Object r2 = r2.value()
            org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleStreamingExtractor$KafkaWatermark r3 = new org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleStreamingExtractor$KafkaWatermark
            r4 = r3
            r5 = r11
            org.apache.kafka.common.TopicPartition r5 = r5._partition
            org.apache.gobblin.source.extractor.extract.LongWatermark r6 = new org.apache.gobblin.source.extractor.extract.LongWatermark
            r7 = r6
            r8 = r12
            long r8 = r8.offset()
            r7.<init>(r8)
            r4.<init>(r5, r6)
            r1.<init>(r2, r3)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleStreamingExtractor.readRecordEnvelopeImpl():org.apache.gobblin.stream.RecordEnvelope");
    }

    public long getExpectedRecordCount() {
        return this._rowCount.get();
    }

    public void close() throws IOException {
        this._close.set(true);
        this._consumer.wakeup();
        synchronized (this._consumer) {
            this.closer.close();
        }
    }

    @Deprecated
    public long getHighWatermark() {
        return 0L;
    }
}
