package org.apache.flink.connector.pulsar.source.reader.split;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.class */
public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplitReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);

    public PulsarOrderedPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema) {
        super(pulsarClient, pulsarAdmin, sourceConfiguration, pulsarDeserializationSchema);
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected Message<byte[]> pollMessage(Duration duration) throws PulsarClientException {
        return this.pulsarConsumer.receive(Math.toIntExact(duration.toMillis()), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected void finishedPollMessage(Message<byte[]> message) {
        LOG.debug("Finished polling message {}", message);
        message.release();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected void startConsumer(PulsarPartitionSplit pulsarPartitionSplit, Consumer<byte[]> consumer) {
        MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
        if (latestConsumedId != null) {
            StartCursor fromMessageId = StartCursor.fromMessageId(latestConsumedId, false);
            TopicPartition partition = pulsarPartitionSplit.getPartition();
            try {
                fromMessageId.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
            } catch (PulsarClientException e) {
                if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException((Throwable) e);
                }
                LOG.warn("Failed to reset cursor to {} on partition {}", new Object[]{latestConsumedId, partition, e});
            }
        }
    }

    public void notifyCheckpointComplete(TopicPartition topicPartition, MessageId messageId) {
        if (this.pulsarConsumer == null) {
            this.pulsarConsumer = createPulsarConsumer(topicPartition);
        }
        PulsarExceptionUtils.sneakyClient(() -> {
            this.pulsarConsumer.acknowledgeCumulative(messageId);
        });
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void wakeUp() {
        super.wakeUp();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void handleSplitsChanges(SplitsChange splitsChange) {
        super.handleSplitsChanges(splitsChange);
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ RecordsWithSplitIds fetch() throws IOException {
        return super.fetch();
    }
}
