/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderThread<T>
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(ReaderThread.class);
    protected final PulsarFetcher<T> owner;
    protected final PulsarTopicState<T> state;
    protected final ClientConfigurationData clientConf;
    protected final Map<String, Object> readerConf;
    protected final int pollTimeoutMs;
    protected final ExceptionProxy exceptionProxy;
    protected final TopicRange topicRange;
    protected final MessageId startMessageId;
    private boolean failOnDataLoss = true;
    private boolean useEarliestWhenDataLoss = false;
    protected volatile boolean running = true;
    protected volatile boolean closed = false;
    protected final PulsarDeserializationSchema<T> deserializer;
    protected volatile Reader<T> reader = null;

    public ReaderThread(PulsarFetcher<T> owner, PulsarTopicState state, ClientConfigurationData clientConf, Map<String, Object> readerConf, PulsarDeserializationSchema<T> deserializer, int pollTimeoutMs, ExceptionProxy exceptionProxy) {
        this.owner = owner;
        this.state = state;
        this.clientConf = clientConf;
        this.readerConf = readerConf;
        this.deserializer = deserializer;
        this.pollTimeoutMs = pollTimeoutMs;
        this.exceptionProxy = exceptionProxy;
        this.topicRange = state.getTopicRange();
        this.startMessageId = state.getOffset();
    }

    public ReaderThread(PulsarFetcher<T> owner, PulsarTopicState state, ClientConfigurationData clientConf, Map<String, Object> readerConf, PulsarDeserializationSchema<T> deserializer, int pollTimeoutMs, ExceptionProxy exceptionProxy, boolean failOnDataLoss, boolean useEarliestWhenDataLoss) {
        this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy);
        this.failOnDataLoss = failOnDataLoss;
        this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
    }

    @Override
    public void run() {
        log.info("Starting to fetch from {} at {}, failOnDataLoss {}", new Object[]{this.topicRange, this.startMessageId, this.failOnDataLoss});
        try {
            this.createActualReader();
            this.skipFirstMessageIfNeeded();
            log.info("Starting to read {} with reader thread {}", (Object)this.topicRange, (Object)this.getName());
            while (this.running) {
                Message message = this.reader.readNext(this.pollTimeoutMs, TimeUnit.MILLISECONDS);
                if (message == null) continue;
                this.emitRecord(message);
            }
        }
        catch (Throwable e) {
            this.exceptionProxy.reportError(e);
        }
        finally {
            if (this.reader != null) {
                try {
                    this.close();
                }
                catch (Throwable e) {
                    log.error("Error while closing Pulsar reader " + e.toString());
                }
            }
        }
    }

    protected void createActualReader() throws PulsarClientException {
        ReaderBuilder readerBuilder = CachedPulsarClient.getOrCreate(this.clientConf).newReader(this.deserializer.getSchema()).topic(this.topicRange.getTopic()).startMessageId(this.startMessageId).startMessageIdInclusive().loadConf(this.readerConf);
        log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}", new Object[]{this.topicRange, this.startMessageId, this.readerConf});
        if (!this.topicRange.isFullRange()) {
            readerBuilder.keyHashRange(new Range[]{this.topicRange.getPulsarRange()});
        }
        this.reader = readerBuilder.create();
    }

    protected void skipFirstMessageIfNeeded() throws PulsarClientException {
        Message currentMessage = null;
        boolean failOnDataLoss = this.failOnDataLoss;
        if (!this.startMessageId.equals(MessageId.earliest) && !this.startMessageId.equals(MessageId.latest) && ((MessageIdImpl)this.startMessageId).getEntryId() != -1L) {
            MessageIdImpl startMsgIdImpl;
            PulsarMetadataReader metaDataReader = this.owner.getMetaDataReader();
            MessageIdImpl lastMessageId = (MessageIdImpl)metaDataReader.getLastMessageId(this.reader.getTopic());
            if (!ReaderThread.messageIdRoughEquals(this.startMessageId, (MessageId)lastMessageId) && !this.reader.hasMessageAvailable() && (startMsgIdImpl = (MessageIdImpl)this.startMessageId).compareTo((MessageId)lastMessageId) > 0) {
                if (failOnDataLoss) {
                    log.error("the start message id is beyond the last commit message id, with topic:{}", (Object)this.topicRange);
                    throw new RuntimeException("start message id beyond the last commit");
                }
                if (this.useEarliestWhenDataLoss) {
                    log.info("reset message to earliest");
                    this.reader.seek(MessageId.earliest);
                    metaDataReader.resetCursor(this.topicRange, MessageId.earliest);
                } else {
                    log.info("reset message to valid offset {}", (Object)lastMessageId);
                    this.reader.seek((MessageId)lastMessageId);
                    metaDataReader.resetCursor(this.topicRange, (MessageId)lastMessageId);
                }
            }
            while (currentMessage == null && this.running) {
                currentMessage = this.reader.readNext(this.pollTimeoutMs, TimeUnit.MILLISECONDS);
                if (!failOnDataLoss) continue;
            }
            if (currentMessage == null) {
                this.reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", this.startMessageId.toString(), this.topicRange));
            } else {
                MessageId currentId = currentMessage.getMessageId();
                this.state.setOffset(currentId);
                if (!ReaderThread.messageIdRoughEquals(currentId, this.startMessageId) && failOnDataLoss) {
                    this.reportDataLoss(String.format("Potential Data Loss in reading %s: intended to start at %s, actually we get %s", this.topicRange, this.startMessageId.toString(), currentId.toString()));
                }
                if (!(this.startMessageId instanceof BatchMessageIdImpl) || !(currentId instanceof BatchMessageIdImpl)) {
                    if (this.startMessageId instanceof MessageIdImpl && currentId instanceof BatchMessageIdImpl) {
                        BatchMessageIdImpl cbmid = (BatchMessageIdImpl)currentId;
                        MessageIdImpl newStart = new MessageIdImpl(cbmid.getLedgerId(), cbmid.getEntryId() + 1L, cbmid.getPartitionIndex());
                        this.reader.seek((MessageId)newStart);
                    } else if (!(this.startMessageId instanceof MessageIdImpl) || currentId instanceof MessageIdImpl) {
                        // empty if block
                    }
                }
            }
        }
    }

    protected void emitRecord(Message<T> message) throws IOException {
        MessageId messageId = message.getMessageId();
        T record = this.deserializer.deserialize(message);
        if (this.deserializer.isEndOfStream(record)) {
            this.running = false;
            return;
        }
        this.owner.emitRecordsWithTimestamps(record, this.state, messageId, message.getEventTime());
    }

    public void cancel() throws IOException {
        this.running = false;
        if (this.reader != null) {
            try {
                this.close();
            }
            catch (IOException e) {
                log.error("failed to close reader. ", (Throwable)e);
            }
        }
        this.interrupt();
    }

    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.reader.close();
        log.info("Reader closed");
    }

    public boolean isRunning() {
        return this.running;
    }

    private void reportDataLoss(String message) {
        this.running = false;
        this.exceptionProxy.reportError(new IllegalStateException(message + "Some data may have been lost because they are not available in Pulsar any more; either the\n data was aged out by Pulsar or the topic may have been deleted before all the data in the\n topic was processed. If you don't want your streaming query to fail on such cases, set the\n source option \"failOnDataLoss\" to \"false\"."));
    }

    public static boolean messageIdRoughEquals(MessageId l, MessageId r) {
        if (l == null || r == null) {
            return false;
        }
        if (l instanceof BatchMessageIdImpl && r instanceof BatchMessageIdImpl) {
            return l.equals(r);
        }
        if (l instanceof MessageIdImpl && r instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl rb = (BatchMessageIdImpl)r;
            return l.equals(new MessageIdImpl(rb.getLedgerId(), rb.getEntryId(), rb.getPartitionIndex()));
        }
        if (r instanceof MessageIdImpl && l instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl lb = (BatchMessageIdImpl)l;
            return r.equals(new MessageIdImpl(lb.getLedgerId(), lb.getEntryId(), lb.getPartitionIndex()));
        }
        if (l instanceof MessageIdImpl && r instanceof MessageIdImpl) {
            return l.equals(r);
        }
        throw new IllegalStateException(String.format("comparing messageIds of type %s, %s", l.getClass().toString(), r.getClass().toString()));
    }
}

