package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.class */
public class KafkaUnboundedSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
    private final KafkaIO.Read<K, V> spec;
    private final int id;

    public List<KafkaUnboundedSource<K, V>> split(int i, PipelineOptions pipelineOptions) throws Exception {
        ArrayList arrayList = new ArrayList(this.spec.getTopicPartitions());
        if (arrayList.isEmpty()) {
            Consumer consumer = (Consumer) this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());
            Throwable th = null;
            try {
                try {
                    Iterator<String> it = this.spec.getTopics().iterator();
                    while (it.hasNext()) {
                        for (PartitionInfo partitionInfo : consumer.partitionsFor(it.next())) {
                            arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                        }
                    }
                    if (consumer != null) {
                        if (0 != 0) {
                            try {
                                consumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumer.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (consumer != null) {
                    if (th != null) {
                        try {
                            consumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        consumer.close();
                    }
                }
                throw th3;
            }
        }
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.topic();
        }).thenComparing(Comparator.comparingInt((v0) -> {
            return v0.partition();
        })));
        Preconditions.checkArgument(i > 0);
        Preconditions.checkState(arrayList.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names");
        int min = Math.min(i, arrayList.size());
        ArrayList arrayList2 = new ArrayList(min);
        for (int i2 = 0; i2 < min; i2++) {
            arrayList2.add(new ArrayList());
        }
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            ((List) arrayList2.get(i3 % min)).add((TopicPartition) arrayList.get(i3));
        }
        ArrayList arrayList3 = new ArrayList(min);
        for (int i4 = 0; i4 < min; i4++) {
            List<TopicPartition> list = (List) arrayList2.get(i4);
            LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{Integer.valueOf(i4), Integer.valueOf(list.size()), Joiner.on(",").join(list)});
            arrayList3.add(new KafkaUnboundedSource(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(list).build(), i4));
        }
        return arrayList3;
    }

    public KafkaUnboundedReader<K, V> createReader(PipelineOptions pipelineOptions, KafkaCheckpointMark kafkaCheckpointMark) {
        if (!this.spec.getTopicPartitions().isEmpty()) {
            return new KafkaUnboundedReader<>(this, kafkaCheckpointMark);
        }
        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
        try {
            return new KafkaUnboundedReader<>(split(1, pipelineOptions).get(0), kafkaCheckpointMark);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
        return AvroCoder.of(KafkaCheckpointMark.class);
    }

    public boolean requiresDeduping() {
        return false;
    }

    public Coder<KafkaRecord<K, V>> getOutputCoder() {
        return KafkaRecordCoder.of(this.spec.getKeyCoder(), this.spec.getValueCoder());
    }

    public KafkaUnboundedSource(KafkaIO.Read<K, V> read, int i) {
        this.spec = read;
        this.id = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaIO.Read<K, V> getSpec() {
        return this.spec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getId() {
        return this.id;
    }
}
