package org.apache.flink.connector.pulsar.source.reader.split;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.class */
public abstract class PulsarPartitionSplitReaderBase<OUT> implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
    protected final PulsarClient pulsarClient;
    protected final PulsarAdmin pulsarAdmin;
    protected final Configuration configuration;
    protected final SourceConfiguration sourceConfiguration;
    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
    protected final AtomicBoolean wakeup = new AtomicBoolean(false);
    protected Consumer<byte[]> pulsarConsumer;
    protected PulsarPartitionSplit registeredSplit;

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarPartitionSplitReaderBase(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, Configuration configuration, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema) {
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.configuration = configuration;
        this.sourceConfiguration = sourceConfiguration;
        this.deserializationSchema = pulsarDeserializationSchema;
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x009e, code lost:
    
        r0.addFinishedSplit(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.flink.connector.base.source.reader.RecordsWithSplitIds<org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage<OUT>> fetch() throws java.io.IOException {
        /*
            r5 = this;
            org.apache.flink.connector.base.source.reader.RecordsBySplits$Builder r0 = new org.apache.flink.connector.base.source.reader.RecordsBySplits$Builder
            r1 = r0
            r1.<init>()
            r6 = r0
            r0 = r5
            org.apache.pulsar.client.api.Consumer<byte[]> r0 = r0.pulsarConsumer
            if (r0 == 0) goto L16
            r0 = r5
            org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit r0 = r0.registeredSplit
            if (r0 != 0) goto L1b
        L16:
            r0 = r6
            org.apache.flink.connector.base.source.reader.RecordsBySplits r0 = r0.build()
            return r0
        L1b:
            r0 = r5
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.wakeup
            r1 = 1
            r2 = 0
            boolean r0 = r0.compareAndSet(r1, r2)
            r0 = r5
            org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit r0 = r0.registeredSplit
            org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor r0 = r0.getStopCursor()
            r7 = r0
            r0 = r5
            org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit r0 = r0.registeredSplit
            java.lang.String r0 = r0.splitId()
            r8 = r0
            org.apache.flink.connector.pulsar.source.reader.message.PulsarMessageCollector r0 = new org.apache.flink.connector.pulsar.source.reader.message.PulsarMessageCollector
            r1 = r0
            r2 = r8
            r3 = r6
            r1.<init>(r2, r3)
            r9 = r0
            r0 = r5
            org.apache.flink.connector.pulsar.source.config.SourceConfiguration r0 = r0.sourceConfiguration
            java.time.Duration r0 = r0.getMaxFetchTime()
            org.apache.flink.api.common.time.Deadline r0 = org.apache.flink.api.common.time.Deadline.fromNow(r0)
            r10 = r0
            r0 = 0
            r11 = r0
        L4f:
            r0 = r11
            r1 = r5
            org.apache.flink.connector.pulsar.source.config.SourceConfiguration r1 = r1.sourceConfiguration
            int r1 = r1.getMaxFetchRecords()
            if (r0 >= r1) goto Ldc
            r0 = r10
            boolean r0 = r0.hasTimeLeft()
            if (r0 == 0) goto Ldc
            r0 = r5
            boolean r0 = r0.isNotWakeup()
            if (r0 == 0) goto Ldc
            r0 = r10
            java.time.Duration r0 = r0.timeLeftIfAny()     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r12 = r0
            r0 = r5
            r1 = r12
            org.apache.pulsar.client.api.Message r0 = r0.pollMessage(r1)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r13 = r0
            r0 = r9
            r1 = r13
            r0.setMessage(r1)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r0 = r5
            org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema<OUT> r0 = r0.deserializationSchema     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r1 = r13
            r2 = r9
            r0.deserialize(r1, r2)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r0 = r5
            r1 = r13
            r0.finishedPollMessage(r1)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            r0 = r7
            r1 = r13
            boolean r0 = r0.shouldStop(r1)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            if (r0 == 0) goto La6
            r0 = r6
            r1 = r8
            r0.addFinishedSplit(r1)     // Catch: java.lang.InterruptedException -> La9 java.util.concurrent.TimeoutException -> Lb4 java.util.concurrent.ExecutionException -> Lb9 java.lang.Exception -> Lca
            goto Ldc
        La6:
            goto Ld6
        La9:
            r12 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
            goto Ldc
        Lb4:
            r12 = move-exception
            goto Ldc
        Lb9:
            r12 = move-exception
            org.slf4j.Logger r0 = org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.LOG
            java.lang.String r1 = "Error in polling message from pulsar consumer."
            r2 = r12
            r0.error(r1, r2)
            goto Ldc
        Lca:
            r12 = move-exception
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            r2 = r12
            r1.<init>(r2)
            throw r0
        Ld6:
            int r11 = r11 + 1
            goto L4f
        Ldc:
            r0 = r6
            org.apache.flink.connector.base.source.reader.RecordsBySplits r0 = r0.build()
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch():org.apache.flink.connector.base.source.reader.RecordsWithSplitIds");
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChange) {
        LOG.debug("Handle split changes {}", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        List splits = splitsChange.splits();
        Preconditions.checkArgument(splits.size() == 1, "This pulsar split reader only support one split.");
        PulsarPartitionSplit pulsarPartitionSplit = (PulsarPartitionSplit) splits.get(0);
        Consumer<byte[]> createPulsarConsumer = createPulsarConsumer(pulsarPartitionSplit);
        pulsarPartitionSplit.open(this.pulsarAdmin);
        startConsumer(pulsarPartitionSplit, createPulsarConsumer);
        LOG.info("Register split {} consumer for current reader.", pulsarPartitionSplit);
        this.registeredSplit = pulsarPartitionSplit;
        this.pulsarConsumer = createPulsarConsumer;
    }

    public void wakeUp() {
        this.wakeup.compareAndSet(false, true);
    }

    public void close() {
        if (this.pulsarConsumer != null) {
            PulsarExceptionUtils.sneakyClient(() -> {
                this.pulsarConsumer.close();
            });
        }
    }

    protected abstract Message<byte[]> pollMessage(Duration duration) throws ExecutionException, InterruptedException, TimeoutException;

    protected abstract void finishedPollMessage(Message<byte[]> message);

    protected abstract void startConsumer(PulsarPartitionSplit pulsarPartitionSplit, Consumer<byte[]> consumer);

    protected boolean isNotWakeup() {
        return !this.wakeup.get();
    }

    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit pulsarPartitionSplit) {
        return createPulsarConsumer(pulsarPartitionSplit.getPartition());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer<byte[]> createPulsarConsumer(TopicPartition topicPartition) {
        ConsumerBuilder createConsumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, Schema.BYTES, this.configuration);
        createConsumerBuilder.topic(new String[]{topicPartition.getFullTopicName()});
        if (this.sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
            createConsumerBuilder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(new Range[]{topicPartition.getPulsarRange()}));
        }
        createConsumerBuilder.getClass();
        return (Consumer) PulsarExceptionUtils.sneakyClient(createConsumerBuilder::subscribe);
    }
}
