package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.class */
public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
    protected final SourceReader.Context context;
    protected final PulsarClientConfig clientConfig;
    protected final PulsarConsumerConfig consumerConfig;
    protected final StartCursor startCursor;
    protected final DeserializationSchema<T> deserialization;
    protected final int pollTimeout;
    protected final long pollInterval;
    protected final int batchSize;
    protected PulsarClient pulsarClient;
    private boolean noMoreSplitsAssignment = false;
    protected final Map<String, PulsarPartitionSplit> splitStates = new HashMap();
    protected final Map<String, PulsarSplitReaderThread> splitReaders = new HashMap();
    protected final SortedMap<Long, Map<String, MessageId>> pendingCursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
    protected final Map<String, MessageId> pendingCursorsToFinish = Collections.synchronizedSortedMap(new TreeMap());
    protected final Set<String> finishedSplits = new TreeSet();
    protected final Handover<RecordWithSplitId> handover = new Handover<>();

    public PulsarSourceReader(SourceReader.Context context, PulsarClientConfig pulsarClientConfig, PulsarConsumerConfig pulsarConsumerConfig, StartCursor startCursor, DeserializationSchema<T> deserializationSchema, int i, long j, int i2) {
        this.context = context;
        this.clientConfig = pulsarClientConfig;
        this.consumerConfig = pulsarConsumerConfig;
        this.startCursor = startCursor;
        this.deserialization = deserializationSchema;
        this.pollTimeout = i;
        this.pollInterval = j;
        this.batchSize = i2;
    }

    public void open() {
        this.pulsarClient = PulsarConfigUtil.createClient(this.clientConfig);
    }

    public void close() throws IOException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.splitReaders.values().forEach(pulsarSplitReaderThread -> {
            try {
                pulsarSplitReaderThread.close();
            } catch (IOException e) {
                throw new PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to close the split reader thread.", e);
            }
        });
    }

    public void pollNext(Collector<T> collector) throws Exception {
        for (int i = 0; i < this.batchSize; i++) {
            Optional pollNext = this.handover.pollNext();
            if (pollNext.isPresent()) {
                String splitId = ((RecordWithSplitId) pollNext.get()).getSplitId();
                Message<byte[]> message = ((RecordWithSplitId) pollNext.get()).getMessage();
                synchronized (collector.getCheckpointLock()) {
                    this.splitStates.get(splitId).setLatestConsumedId(message.getMessageId());
                    this.deserialization.deserialize(message.getData(), collector);
                }
            }
            if (this.noMoreSplitsAssignment && this.finishedSplits.size() == this.splitStates.size()) {
                this.context.signalNoMoreElement();
                return;
            }
        }
    }

    public List<PulsarPartitionSplit> snapshotState(long j) throws Exception {
        List<PulsarPartitionSplit> list = (List) this.splitStates.values().stream().map((v0) -> {
            return v0.copy();
        }).collect(Collectors.toList());
        int size = list.size();
        Map<String, MessageId> computeIfAbsent = this.pendingCursorsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashMap(size);
        });
        for (PulsarPartitionSplit pulsarPartitionSplit : list) {
            MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                computeIfAbsent.put(pulsarPartitionSplit.splitId(), latestConsumedId);
            }
        }
        return list;
    }

    public void addSplits(List<PulsarPartitionSplit> list) {
        list.forEach(pulsarPartitionSplit -> {
            this.splitStates.put(pulsarPartitionSplit.splitId(), pulsarPartitionSplit);
            PulsarSplitReaderThread createPulsarSplitReaderThread = createPulsarSplitReaderThread(pulsarPartitionSplit);
            try {
                createPulsarSplitReaderThread.open();
                this.splitReaders.put(pulsarPartitionSplit.splitId(), createPulsarSplitReaderThread);
                createPulsarSplitReaderThread.start();
            } catch (PulsarClientException e) {
                throw new PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to start the split reader thread.", e);
            }
        });
    }

    protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit pulsarPartitionSplit) {
        return new PulsarSplitReaderThread(this, pulsarPartitionSplit, this.pulsarClient, this.consumerConfig, this.pollTimeout, this.pollInterval, this.startCursor, this.handover);
    }

    public void handleNoMoreElements(String str, MessageId messageId) {
        LOG.info("Reader received the split {} NoMoreElements event.", str);
        this.pendingCursorsToFinish.put(str, messageId);
    }

    public void handleNoMoreSplits() {
        LOG.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Committing cursors for checkpoint {}", Long.valueOf(j));
        Map<String, MessageId> remove = this.pendingCursorsToCommit.remove(Long.valueOf(j));
        if (remove == null) {
            LOG.debug("Cursors for checkpoint {} either do not exist or have already been committed.", Long.valueOf(j));
        } else {
            remove.forEach((str, messageId) -> {
                if (this.finishedSplits.contains(str)) {
                    return;
                }
                this.splitReaders.get(str).committingCursor(messageId);
                if (this.pendingCursorsToFinish.containsKey(str) && this.pendingCursorsToFinish.get(str).compareTo(messageId) == 0) {
                    this.finishedSplits.add(str);
                    try {
                        this.splitReaders.get(str).close();
                    } catch (IOException e) {
                        throw new PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to close the split reader thread.", e);
                    }
                }
            });
        }
    }
}
