package org.apache.beam.sdk.io.kafka;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
@Experimental
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.class */
public class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceDescriptor> {
    private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
    private static final String TIMER_ID = "watch_timer";
    private static final String STATE_ID = "topic_partition_set";
    private final Duration checkDuration;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn;
    private final SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
    private final Map<String, Object> kafkaConsumerConfig;
    private final Instant startReadTime;
    private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
    private final List<String> topics;

    @DoFn.TimerId(TIMER_ID)
    private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    @DoFn.StateId(STATE_ID)
    private final StateSpec<BagState<TopicPartition>> bagStateSpec = StateSpecs.bag(new TopicPartitionCoder());

    /* JADX INFO: Access modifiers changed from: package-private */
    public WatchKafkaTopicPartitionDoFn(Duration duration, SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction, SerializableFunction<TopicPartition, Boolean> serializableFunction2, Map<String, Object> map, Instant instant, List<String> list) {
        this.checkDuration = duration == null ? DEFAULT_CHECK_DURATION : duration;
        this.kafkaConsumerFactoryFn = serializableFunction;
        this.checkStopReadingFn = serializableFunction2;
        this.kafkaConsumerConfig = map;
        this.startReadTime = instant;
        this.topics = list;
    }

    @VisibleForTesting
    Set<TopicPartition> getAllTopicPartitions() {
        HashSet hashSet = new HashSet();
        Consumer<byte[], byte[]> apply = this.kafkaConsumerFactoryFn.apply(this.kafkaConsumerConfig);
        Throwable th = null;
        try {
            if (this.topics == null || this.topics.isEmpty()) {
                for (Map.Entry<String, List<PartitionInfo>> entry : apply.listTopics().entrySet()) {
                    Iterator<PartitionInfo> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        hashSet.add(new TopicPartition(entry.getKey(), it.next().partition()));
                    }
                }
            } else {
                for (String str : this.topics) {
                    Iterator<PartitionInfo> it2 = apply.partitionsFor(str).iterator();
                    while (it2.hasNext()) {
                        hashSet.add(new TopicPartition(str, it2.next().partition()));
                    }
                }
            }
            return hashSet;
        } finally {
            if (apply != null) {
                if (0 != 0) {
                    try {
                        apply.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    apply.close();
                }
            }
        }
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.TimerId("watch_timer") Timer timer, @DoFn.StateId("topic_partition_set") BagState<TopicPartition> bagState, DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
        getAllTopicPartitions().forEach(topicPartition -> {
            if (this.checkStopReadingFn == null || !this.checkStopReadingFn.apply(topicPartition).booleanValue()) {
                Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString()).inc();
                bagState.add(topicPartition);
                outputReceiver.output(KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null));
            }
        });
        timer.offset(this.checkDuration).setRelative();
    }

    @DoFn.OnTimer(TIMER_ID)
    public void onTimer(@DoFn.TimerId("watch_timer") Timer timer, @DoFn.StateId("topic_partition_set") BagState<TopicPartition> bagState, DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
        HashSet hashSet = new HashSet();
        bagState.read().forEach(topicPartition -> {
            hashSet.add(topicPartition);
        });
        bagState.clear();
        Set<TopicPartition> allTopicPartitions = getAllTopicPartitions();
        Sets.difference(allTopicPartitions, hashSet).forEach(topicPartition2 -> {
            if (this.checkStopReadingFn == null || !this.checkStopReadingFn.apply(topicPartition2).booleanValue()) {
                Metrics.counter(COUNTER_NAMESPACE, topicPartition2.toString()).inc();
                outputReceiver.output(KafkaSourceDescriptor.of(topicPartition2, null, this.startReadTime, null));
            }
        });
        allTopicPartitions.forEach(topicPartition3 -> {
            if (this.checkStopReadingFn == null || !this.checkStopReadingFn.apply(topicPartition3).booleanValue()) {
                bagState.add(topicPartition3);
            }
        });
        timer.set(Instant.now().plus(Duration.millis(this.checkDuration.getMillis())));
    }
}
