package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.class */
public class KafkaUnboundedReader<K, V> extends UnboundedSource.UnboundedReader<KafkaRecord<K, V>> {

    @VisibleForTesting
    static final String METRIC_NAMESPACE = "KafkaIOReader";

    @VisibleForTesting
    static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";
    private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC = "checkpointMarkCommitsSkipped";
    private final KafkaUnboundedSource<K, V> source;
    private final String name;
    private Consumer<byte[], byte[]> consumer;
    private final List<PartitionState<K, V>> partitionStates;
    private KafkaRecord<K, V> curRecord;
    private Instant curTimestamp;
    private final Counter elementsReadBySplit;
    private final Counter bytesReadBySplit;
    private final Gauge backlogBytesOfSplit;
    private final Gauge backlogElementsOfSplit;
    private Consumer<byte[], byte[]> offsetConsumer;
    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;
    private static final long UNINITIALIZED_OFFSET = -1;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
    private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
    private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
    private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
    private Deserializer<K> keyDeserializerInstance = null;
    private Deserializer<V> valueDeserializerInstance = null;
    private final Counter elementsRead = SourceMetrics.elementsRead();
    private final Counter bytesRead = SourceMetrics.bytesRead();
    private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
    private final Counter checkpointMarkCommitsSkipped = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC);
    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
    private AtomicReference<Exception> consumerPollException = new AtomicReference<>();
    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue = new SynchronousQueue<>();
    private AtomicReference<KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference<>();
    private AtomicBoolean closed = new AtomicBoolean(false);
    private final ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor();

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaUnboundedReader$PartitionState.class */
    public static class PartitionState<K, V> {
        private final TopicPartition topicPartition;
        private long nextOffset;
        private final TimestampPolicy<K, V> timestampPolicy;
        private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
        private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg();
        private KafkaIOUtils.MovingAvg avgOffsetGap = new KafkaIOUtils.MovingAvg();
        private long latestOffset = KafkaUnboundedReader.UNINITIALIZED_OFFSET;
        private Instant latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

        PartitionState(TopicPartition topicPartition, long j, TimestampPolicy<K, V> timestampPolicy) {
            this.topicPartition = topicPartition;
            this.nextOffset = j;
            this.timestampPolicy = timestampPolicy;
        }

        void recordConsumed(long j, int i, long j2) {
            this.nextOffset = j + 1;
            this.avgRecordSize.update(i);
            this.avgOffsetGap.update(j2);
        }

        synchronized void setLatestOffset(long j, Instant instant) {
            this.latestOffset = j;
            this.latestOffsetFetchTime = instant;
            KafkaUnboundedReader.LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", new Object[]{this, this.topicPartition, Long.valueOf(j), Long.valueOf(this.nextOffset), this.avgRecordSize});
        }

        synchronized long approxBacklogInBytes() {
            long backlogMessageCount = backlogMessageCount();
            return backlogMessageCount == KafkaUnboundedReader.UNINITIALIZED_OFFSET ? KafkaUnboundedReader.UNINITIALIZED_OFFSET : (long) (backlogMessageCount * this.avgRecordSize.get());
        }

        synchronized long backlogMessageCount() {
            return (this.latestOffset < 0 || this.nextOffset < 0) ? KafkaUnboundedReader.UNINITIALIZED_OFFSET : Math.max(0L, (long) Math.ceil((this.latestOffset - this.nextOffset) / (1.0d + this.avgOffsetGap.get())));
        }

        synchronized TimestampPolicyContext mkTimestampPolicyContext() {
            return new TimestampPolicyContext(backlogMessageCount(), this.latestOffsetFetchTime);
        }

        Instant updateAndGetWatermark() {
            this.lastWatermark = this.timestampPolicy.getWatermark(mkTimestampPolicyContext());
            return this.lastWatermark;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.PartitionState.access$102(org.apache.beam.sdk.io.kafka.KafkaUnboundedReader$PartitionState, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$102(org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.PartitionState r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.nextOffset = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.PartitionState.access$102(org.apache.beam.sdk.io.kafka.KafkaUnboundedReader$PartitionState, long):long");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaUnboundedReader$TimestampPolicyContext.class */
    public static class TimestampPolicyContext extends TimestampPolicy.PartitionContext {
        private final long messageBacklog;
        private final Instant backlogCheckTime;

        public TimestampPolicyContext(long j, Instant instant) {
            this.messageBacklog = j;
            this.backlogCheckTime = instant;
        }

        @Override // org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext
        public long getMessageBacklog() {
            return this.messageBacklog;
        }

        @Override // org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext
        public Instant getBacklogCheckTime() {
            return this.backlogCheckTime;
        }
    }

    public boolean start() throws IOException {
        KafkaIO.Read<K, V> spec = this.source.getSpec();
        this.consumer = (Consumer) spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
        ConsumerSpEL.evaluateAssign(this.consumer, spec.getTopicPartitions());
        this.keyDeserializerInstance = spec.getKeyDeserializerProvider().getDeserializer(spec.getConsumerConfig(), true);
        this.valueDeserializerInstance = spec.getValueDeserializerProvider().getDeserializer(spec.getConsumerConfig(), false);
        for (PartitionState<K, V> partitionState : this.partitionStates) {
            try {
                this.consumerPollThread.submit(() -> {
                    setupInitialOffset(partitionState);
                }).get(resolveDefaultApiTimeout(spec).getMillis(), TimeUnit.MILLISECONDS);
                LOG.info("{}: reading from {} starting at offset {}", new Object[]{this.name, ((PartitionState) partitionState).topicPartition, Long.valueOf(((PartitionState) partitionState).nextOffset)});
            } catch (TimeoutException e) {
                this.consumer.wakeup();
                String format = String.format("%s: Timeout while initializing partition '%s'. Kafka client may not be able to connect to servers.", this, ((PartitionState) partitionState).topicPartition);
                LOG.error("{}", format);
                throw new IOException(format);
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }
        this.consumerPollThread.submit(this::consumerPollLoop);
        this.offsetConsumer = (Consumer) spec.getConsumerFactoryFn().apply(KafkaIOUtils.getOffsetConsumerConfig(this.name, spec.getOffsetConsumerConfig(), spec.getConsumerConfig()));
        ConsumerSpEL.evaluateAssign(this.offsetConsumer, spec.getTopicPartitions());
        updateLatestOffsets();
        this.offsetFetcherThread.scheduleAtFixedRate(this::updateLatestOffsets, 0L, 1L, TimeUnit.SECONDS);
        return advance();
    }

    public boolean advance() throws IOException {
        while (true) {
            if (this.curBatch.hasNext()) {
                PartitionState<K, V> next = this.curBatch.next();
                if (((PartitionState) next).recordIter.hasNext()) {
                    this.elementsRead.inc();
                    this.elementsReadBySplit.inc();
                    ConsumerRecord consumerRecord = (ConsumerRecord) ((PartitionState) next).recordIter.next();
                    long j = ((PartitionState) next).nextOffset;
                    long offset = consumerRecord.offset();
                    if (offset >= j) {
                        long j2 = offset - j;
                        if (this.curRecord == null) {
                            LOG.info("{}: first record offset {}", this.name, Long.valueOf(offset));
                            j2 = 0;
                        }
                        KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), ConsumerSpEL.getRecordTimestamp(consumerRecord), ConsumerSpEL.getRecordTimestampType(consumerRecord), ConsumerSpEL.hasHeaders() ? consumerRecord.headers() : null, ConsumerSpEL.deserializeKey(this.keyDeserializerInstance, consumerRecord), ConsumerSpEL.deserializeValue(this.valueDeserializerInstance, consumerRecord));
                        this.curTimestamp = ((PartitionState) next).timestampPolicy.getTimestampForRecord(next.mkTimestampPolicyContext(), kafkaRecord);
                        this.curRecord = kafkaRecord;
                        int length = (consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length);
                        next.recordConsumed(offset, length, j2);
                        this.bytesRead.inc(length);
                        this.bytesReadBySplit.inc(length);
                        return true;
                    }
                    LOG.warn("{}: ignoring already consumed offset {} for {}", new Object[]{this, Long.valueOf(offset), ((PartitionState) next).topicPartition});
                } else {
                    ((PartitionState) next).recordIter = Collections.emptyIterator();
                    this.curBatch.remove();
                }
            } else {
                nextBatch();
                if (!this.curBatch.hasNext()) {
                    return false;
                }
            }
        }
    }

    public Instant getWatermark() {
        if (this.source.getSpec().getWatermarkFn() == null) {
            return (Instant) this.partitionStates.stream().map((v0) -> {
                return v0.updateAndGetWatermark();
            }).min(Comparator.naturalOrder()).get();
        }
        if (this.curRecord != null) {
            return (Instant) this.source.getSpec().getWatermarkFn().apply(this.curRecord);
        }
        LOG.debug("{}: getWatermark() : no records have been read yet.", this.name);
        return initialWatermark;
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        reportBacklog();
        return new KafkaCheckpointMark((List) this.partitionStates.stream().map(partitionState -> {
            return new KafkaCheckpointMark.PartitionMark(partitionState.topicPartition.topic(), partitionState.topicPartition.partition(), partitionState.nextOffset, partitionState.lastWatermark.getMillis());
        }).collect(Collectors.toList()), this.source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty());
    }

    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
        return this.source;
    }

    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
        return this.curRecord;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return this.curTimestamp;
    }

    public long getSplitBacklogBytes() {
        long j = 0;
        Iterator<PartitionState<K, V>> it = this.partitionStates.iterator();
        while (it.hasNext()) {
            long approxBacklogInBytes = it.next().approxBacklogInBytes();
            if (approxBacklogInBytes == UNINITIALIZED_OFFSET) {
                return UNINITIALIZED_OFFSET;
            }
            j += approxBacklogInBytes;
        }
        return j;
    }

    public String toString() {
        return this.name;
    }

    public KafkaUnboundedReader(KafkaUnboundedSource<K, V> kafkaUnboundedSource, KafkaCheckpointMark kafkaCheckpointMark) {
        this.source = kafkaUnboundedSource;
        this.name = "Reader-" + kafkaUnboundedSource.getId();
        List<TopicPartition> topicPartitions = kafkaUnboundedSource.getSpec().getTopicPartitions();
        ArrayList arrayList = new ArrayList(topicPartitions.size());
        if (kafkaCheckpointMark != null) {
            Preconditions.checkState(kafkaCheckpointMark.getPartitions().size() == topicPartitions.size(), "checkPointMark and assignedPartitions should match");
        }
        for (int i = 0; i < topicPartitions.size(); i += OFFSET_UPDATE_INTERVAL_SECONDS) {
            TopicPartition topicPartition = topicPartitions.get(i);
            long j = -1;
            Optional<Instant> empty = Optional.empty();
            if (kafkaCheckpointMark != null) {
                KafkaCheckpointMark.PartitionMark partitionMark = kafkaCheckpointMark.getPartitions().get(i);
                TopicPartition topicPartition2 = new TopicPartition(partitionMark.getTopic(), partitionMark.getPartition());
                Preconditions.checkState(topicPartition2.equals(topicPartition), "checkpointed partition %s and assigned partition %s don't match", topicPartition2, topicPartition);
                j = partitionMark.getNextOffset();
                empty = Optional.of(new Instant(partitionMark.getWatermarkMillis()));
            }
            arrayList.add(new PartitionState(topicPartition, j, kafkaUnboundedSource.getSpec().getTimestampPolicyFactory().createTimestampPolicy(topicPartition, empty)));
        }
        this.partitionStates = ImmutableList.copyOf(arrayList);
        String valueOf = String.valueOf(kafkaUnboundedSource.getId());
        this.elementsReadBySplit = SourceMetrics.elementsReadBySplit(valueOf);
        this.bytesReadBySplit = SourceMetrics.bytesReadBySplit(valueOf);
        this.backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(valueOf);
        this.backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(valueOf);
    }

    private void consumerPollLoop() {
        try {
            ConsumerRecords<byte[], byte[]> empty = ConsumerRecords.empty();
            while (!this.closed.get()) {
                try {
                    if (empty.isEmpty()) {
                        empty = this.consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    } else if (this.availableRecordsQueue.offer(empty, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
                        empty = ConsumerRecords.empty();
                    }
                    KafkaCheckpointMark andSet = this.finalizedCheckpointMark.getAndSet(null);
                    if (andSet != null) {
                        commitCheckpointMark(andSet);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e);
                } catch (WakeupException e2) {
                }
            }
            LOG.info("{}: Returning from consumer pool loop", this);
        } catch (Exception e3) {
            LOG.error("{}: Exception while reading from Kafka", this, e3);
            this.consumerPollException.set(e3);
            throw e3;
        }
    }

    private void commitCheckpointMark(KafkaCheckpointMark kafkaCheckpointMark) {
        LOG.debug("{}: Committing finalized checkpoint {}", this, kafkaCheckpointMark);
        this.consumer.commitSync((Map) kafkaCheckpointMark.getPartitions().stream().filter(partitionMark -> {
            return partitionMark.getNextOffset() != UNINITIALIZED_OFFSET;
        }).collect(Collectors.toMap(partitionMark2 -> {
            return new TopicPartition(partitionMark2.getTopic(), partitionMark2.getPartition());
        }, partitionMark3 -> {
            return new OffsetAndMetadata(partitionMark3.getNextOffset());
        })));
    }

    public void finalizeCheckpointMarkAsync(KafkaCheckpointMark kafkaCheckpointMark) {
        if (this.finalizedCheckpointMark.getAndSet(kafkaCheckpointMark) != null) {
            this.checkpointMarkCommitsSkipped.inc();
        }
        this.checkpointMarkCommitsEnqueued.inc();
    }

    private void nextBatch() throws IOException {
        this.curBatch = Collections.emptyIterator();
        try {
            ConsumerRecords<byte[], byte[]> poll = this.availableRecordsQueue.poll(RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
            if (poll == null) {
                if (this.consumerPollException.get() != null) {
                    throw new IOException("Exception while reading from Kafka", this.consumerPollException.get());
                }
            } else {
                this.partitionStates.forEach(partitionState -> {
                    partitionState.recordIter = poll.records(partitionState.topicPartition).iterator();
                });
                this.curBatch = Iterators.cycle(new ArrayList(this.partitionStates));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("{}: Unexpected", this, e);
        }
    }

    private void setupInitialOffset(PartitionState partitionState) {
        KafkaIO.Read<K, V> spec = this.source.getSpec();
        if (partitionState.nextOffset != UNINITIALIZED_OFFSET) {
            this.consumer.seek(partitionState.topicPartition, partitionState.nextOffset);
        } else if (spec.getStartReadTime() == null) {
            PartitionState.access$102(partitionState, this.consumer.position(partitionState.topicPartition));
        } else {
            PartitionState.access$102(partitionState, ConsumerSpEL.offsetForTime(this.consumer, partitionState.topicPartition, spec.getStartReadTime()));
            this.consumer.seek(partitionState.topicPartition, partitionState.nextOffset);
        }
    }

    private void updateLatestOffsets() {
        for (PartitionState<K, V> partitionState : this.partitionStates) {
            try {
                Instant now = Instant.now();
                ConsumerSpEL.evaluateSeek2End(this.offsetConsumer, ((PartitionState) partitionState).topicPartition);
                partitionState.setLatestOffset(this.offsetConsumer.position(((PartitionState) partitionState).topicPartition), now);
            } catch (Exception e) {
                if (this.closed.get()) {
                    break;
                } else {
                    LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", new Object[]{this, ((PartitionState) partitionState).topicPartition, e});
                }
            }
        }
        LOG.debug("{}:  backlog {}", this, Long.valueOf(getSplitBacklogBytes()));
    }

    private void reportBacklog() {
        long splitBacklogBytes = getSplitBacklogBytes();
        if (splitBacklogBytes < 0) {
            splitBacklogBytes = -1;
        }
        this.backlogBytesOfSplit.set(splitBacklogBytes);
        long splitBacklogMessageCount = getSplitBacklogMessageCount();
        if (splitBacklogMessageCount < 0) {
            splitBacklogMessageCount = -1;
        }
        this.backlogElementsOfSplit.set(splitBacklogMessageCount);
    }

    private long getSplitBacklogMessageCount() {
        long j = 0;
        Iterator<PartitionState<K, V>> it = this.partitionStates.iterator();
        while (it.hasNext()) {
            long backlogMessageCount = it.next().backlogMessageCount();
            if (backlogMessageCount == UNINITIALIZED_OFFSET) {
                return UNINITIALIZED_OFFSET;
            }
            j += backlogMessageCount;
        }
        return j;
    }

    public void close() throws IOException {
        this.closed.set(true);
        this.consumerPollThread.shutdown();
        this.offsetFetcherThread.shutdown();
        boolean z = false;
        while (!z) {
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
            if (this.offsetConsumer != null) {
                this.offsetConsumer.wakeup();
            }
            this.availableRecordsQueue.poll();
            try {
                z = this.consumerPollThread.awaitTermination(10L, TimeUnit.SECONDS) && this.offsetFetcherThread.awaitTermination(10L, TimeUnit.SECONDS);
                if (!z) {
                    LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        Closeables.close(this.keyDeserializerInstance, true);
        Closeables.close(this.valueDeserializerInstance, true);
        Closeables.close(this.offsetConsumer, true);
        Closeables.close(this.consumer, true);
    }

    @VisibleForTesting
    static Duration resolveDefaultApiTimeout(KafkaIO.Read<?, ?> read) {
        Duration tryParseDurationFromMillis;
        Duration tryParseDurationFromMillis2 = tryParseDurationFromMillis(read.getConsumerConfig().get("default.api.timeout.ms"));
        if (tryParseDurationFromMillis2 == null && (tryParseDurationFromMillis = tryParseDurationFromMillis(read.getConsumerConfig().get("request.timeout.ms"))) != null) {
            tryParseDurationFromMillis2 = Duration.millis(2 * tryParseDurationFromMillis.getMillis());
        }
        return tryParseDurationFromMillis2 == null ? Duration.standardSeconds(60L) : tryParseDurationFromMillis2;
    }

    private static Duration tryParseDurationFromMillis(Object obj) {
        if (obj == null) {
            return null;
        }
        return obj instanceof Integer ? Duration.millis(((Integer) obj).intValue()) : Duration.millis(Integer.parseInt(obj.toString()));
    }

    /* renamed from: getCurrentSource */
    public /* bridge */ /* synthetic */ Source m18getCurrentSource() {
        return getCurrentSource();
    }

    /* renamed from: getCurrent */
    public /* bridge */ /* synthetic */ Object m19getCurrent() throws NoSuchElementException {
        return getCurrent();
    }

    static {
    }
}
