package org.apache.beam.sdk.io.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@DoFn.UnboundedPerElement
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.class */
public class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private final Map<String, Object> offsetConsumerConfig;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn;
    private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private transient ConsumerSpEL consumerSpEL = null;
    private transient Deserializer<K> keyDeserializerInstance = null;
    private transient Deserializer<V> valueDeserializerInstance = null;
    private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;

    @VisibleForTesting
    final DeserializerProvider keyDeserializerProvider;

    @VisibleForTesting
    final DeserializerProvider valueDeserializerProvider;

    @VisibleForTesting
    final Map<String, Object> consumerConfig;
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$AverageRecordSize.class */
    public static class AverageRecordSize {
        private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg();
        private KafkaIOUtils.MovingAvg avgRecordGap = new KafkaIOUtils.MovingAvg();

        public void update(int i, long j) {
            this.avgRecordSize.update(i);
            this.avgRecordGap.update(j);
        }

        public double getTotalSize(double d) {
            return (this.avgRecordSize.get() * d) / (1.0d + this.avgRecordGap.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$KafkaLatestOffsetEstimator.class */
    public static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final Consumer<byte[], byte[]> offsetConsumer;
        private final TopicPartition topicPartition;
        private final ConsumerSpEL consumerSpEL = new ConsumerSpEL();
        private final Supplier<Long> memoizedBacklog;

        KafkaLatestOffsetEstimator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
            this.offsetConsumer = consumer;
            this.topicPartition = topicPartition;
            this.consumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition));
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                this.consumerSpEL.evaluateSeek2End(consumer, topicPartition);
                return Long.valueOf(consumer.position(topicPartition));
            }, 5L, TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, true);
            } catch (Exception e) {
                ReadFromKafkaDoFn.LOG.warn("Failed to close offset consumer for {}", this.topicPartition);
            }
        }

        public long estimate() {
            return ((Long) this.memoizedBacklog.get()).longValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadFromKafkaDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
        this.consumerConfig = readSourceDescriptors.getConsumerConfig();
        this.offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
        this.keyDeserializerProvider = readSourceDescriptors.getKeyDeserializerProvider();
        this.valueDeserializerProvider = readSourceDescriptors.getValueDeserializerProvider();
        this.consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
        this.extractOutputTimestampFn = readSourceDescriptors.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = readSourceDescriptors.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = readSourceDescriptors.getTimestampPolicyFactory();
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor) {
        Consumer<?, ?> consumer = (Consumer) this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("initialOffset", this.offsetConsumerConfig, overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor)));
        try {
            this.consumerSpEL.evaluateAssign(consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
            OffsetRange offsetRange = new OffsetRange(kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset().longValue() : kafkaSourceDescriptor.getStartReadTime() != null ? this.consumerSpEL.offsetForTime(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaSourceDescriptor.getStartReadTime()) : consumer.position(kafkaSourceDescriptor.getTopicPartition()), Long.MAX_VALUE);
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            return offsetRange;
        } catch (Throwable th) {
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            throw th;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return (WatermarkEstimator) this.createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) throws Exception {
        double workRemaining = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return !this.avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition()) ? workRemaining : ((AverageRecordSize) this.avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition())).getTotalSize(workRemaining);
    }

    @DoFn.NewTracker
    public GrowableOffsetRangeTracker restrictionTracker(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        return new GrowableOffsetRangeTracker(offsetRange.getFrom(), new KafkaLatestOffsetEstimator((Consumer) this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + kafkaSourceDescriptor.getTopicPartition(), this.offsetConsumerConfig, overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor))), kafkaSourceDescriptor.getTopicPartition()));
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator watermarkEstimator, DoFn.OutputReceiver<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputReceiver) {
        Instant instant;
        Map<String, Object> overrideBootstrapServersConfig = overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy<K, V> timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(kafkaSourceDescriptor.getTopicPartition(), Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        Consumer consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig);
        try {
            this.consumerSpEL.evaluateAssign(consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = from;
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), from);
            ConsumerRecords.empty();
            while (true) {
                ConsumerRecords poll = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                if (poll.isEmpty()) {
                    DoFn.ProcessContinuation resume = DoFn.ProcessContinuation.resume();
                    if (consumer != null) {
                        $closeResource(null, consumer);
                    }
                    return resume;
                }
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                    if (!restrictionTracker.tryClaim(Long.valueOf(consumerRecord.offset()))) {
                        DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                        if (consumer != null) {
                            $closeResource(null, consumer);
                        }
                        return stop;
                    }
                    KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), this.consumerSpEL.getRecordTimestamp(consumerRecord), this.consumerSpEL.getRecordTimestampType(consumerRecord), ConsumerSpEL.hasHeaders() ? consumerRecord.headers() : null, this.consumerSpEL.deserializeKey(this.keyDeserializerInstance, consumerRecord), this.consumerSpEL.deserializeValue(this.valueDeserializerInstance, consumerRecord));
                    ((AverageRecordSize) this.avgRecordSize.getUnchecked(kafkaSourceDescriptor.getTopicPartition())).update((consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length), consumerRecord.offset() - j);
                    j = consumerRecord.offset() + 1;
                    if (timestampPolicy != null) {
                        Preconditions.checkState(watermarkEstimator instanceof ManualWatermarkEstimator);
                        KafkaUnboundedReader.TimestampPolicyContext timestampPolicyContext = new KafkaUnboundedReader.TimestampPolicyContext((long) ((RestrictionTracker.HasProgress) restrictionTracker).getProgress().getWorkRemaining(), Instant.now());
                        instant = timestampPolicy.getTimestampForRecord(timestampPolicyContext, kafkaRecord);
                        ((ManualWatermarkEstimator) watermarkEstimator).setWatermark(ensureTimestampWithinBounds(timestampPolicy.getWatermark(timestampPolicyContext)));
                    } else {
                        instant = (Instant) this.extractOutputTimestampFn.apply(kafkaRecord);
                    }
                    outputReceiver.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), instant);
                }
            }
        } catch (Throwable th) {
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            throw th;
        }
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.avgRecordSize = CacheBuilder.newBuilder().maximumSize(1000L).build(new CacheLoader<TopicPartition, AverageRecordSize>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.1
            public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
                return new AverageRecordSize();
            }
        });
        this.consumerSpEL = new ConsumerSpEL();
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        try {
            Closeables.close(this.keyDeserializerInstance, true);
            Closeables.close(this.valueDeserializerInstance, true);
        } catch (Exception e) {
            LOG.warn("Fail to close resource during finishing bundle.", e);
        }
    }

    private Map<String, Object> overrideBootstrapServersConfig(Map<String, Object> map, KafkaSourceDescriptor kafkaSourceDescriptor) {
        Preconditions.checkState(map.containsKey("bootstrap.servers") || kafkaSourceDescriptor.getBootStrapServers() != null);
        HashMap hashMap = new HashMap(map);
        if (kafkaSourceDescriptor.getBootStrapServers() != null && kafkaSourceDescriptor.getBootStrapServers().size() > 0) {
            hashMap.put("bootstrap.servers", String.join(",", kafkaSourceDescriptor.getBootStrapServers()));
        }
        return hashMap;
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
