package org.apache.flink.connector.pulsar.source.reader.split;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.class */
abstract class PulsarPartitionSplitReaderBase<OUT> implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
    protected final PulsarClient pulsarClient;
    protected final PulsarAdmin pulsarAdmin;
    protected final SourceConfiguration sourceConfiguration;
    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
    protected final AtomicBoolean wakeup = new AtomicBoolean(false);
    protected Consumer<byte[]> pulsarConsumer;
    protected PulsarPartitionSplit registeredSplit;

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarPartitionSplitReaderBase(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema) {
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.sourceConfiguration = sourceConfiguration;
        this.deserializationSchema = pulsarDeserializationSchema;
    }

    /* JADX WARN: Code restructure failed: missing block: B:19:0x00a6, code lost:
    
        r0.addFinishedSplit(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.flink.connector.base.source.reader.RecordsWithSplitIds<org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage<OUT>> fetch() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 233
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch():org.apache.flink.connector.base.source.reader.RecordsWithSplitIds");
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChange) {
        LOG.debug("Handle split changes {}", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        List splits = splitsChange.splits();
        Preconditions.checkArgument(splits.size() == 1, "This pulsar split reader only support one split.");
        PulsarPartitionSplit pulsarPartitionSplit = (PulsarPartitionSplit) splits.get(0);
        Consumer<byte[]> createPulsarConsumer = createPulsarConsumer(pulsarPartitionSplit);
        pulsarPartitionSplit.open(this.pulsarAdmin);
        startConsumer(pulsarPartitionSplit, createPulsarConsumer);
        LOG.info("Register split {} consumer for current reader.", pulsarPartitionSplit);
        this.registeredSplit = pulsarPartitionSplit;
        this.pulsarConsumer = createPulsarConsumer;
    }

    public void wakeUp() {
        this.wakeup.compareAndSet(false, true);
    }

    public void close() {
        if (this.pulsarConsumer != null) {
            PulsarExceptionUtils.sneakyClient(() -> {
                this.pulsarConsumer.close();
            });
        }
    }

    @Nullable
    protected abstract Message<byte[]> pollMessage(Duration duration) throws ExecutionException, InterruptedException, PulsarClientException;

    protected abstract void finishedPollMessage(Message<byte[]> message);

    protected abstract void startConsumer(PulsarPartitionSplit pulsarPartitionSplit, Consumer<byte[]> consumer);

    protected boolean isNotWakeup() {
        return !this.wakeup.get();
    }

    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit pulsarPartitionSplit) {
        return createPulsarConsumer(pulsarPartitionSplit.getPartition());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer<byte[]> createPulsarConsumer(TopicPartition topicPartition) {
        ConsumerBuilder createConsumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, Schema.BYTES, this.sourceConfiguration);
        createConsumerBuilder.topic(new String[]{topicPartition.getFullTopicName()});
        if (this.sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
            createConsumerBuilder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(new Range[]{topicPartition.getPulsarRange()}));
        }
        createConsumerBuilder.getClass();
        return (Consumer) PulsarExceptionUtils.sneakyClient(createConsumerBuilder::subscribe);
    }
}
