package org.apache.inlong.sdk.sort.impl;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImpl.class */
public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
    private final Logger logger;
    private final ReentrantReadWriteLock mainLock;
    private final ConcurrentHashMap<String, MessageId> offsetCache;
    private volatile boolean closed;
    private Consumer<byte[]> consumer;
    private volatile boolean stopConsume;

    public InLongPulsarFetcherImpl(InLongTopic inLongTopic, ClientContext clientContext) {
        super(inLongTopic, clientContext);
        this.logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
        this.mainLock = new ReentrantReadWriteLock(true);
        this.offsetCache = new ConcurrentHashMap<>();
        this.closed = false;
        this.stopConsume = false;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void stopConsume(boolean z) {
        this.stopConsume = z;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isConsumeStop() {
        return this.stopConsume;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public InLongTopic getInLongTopic() {
        return this.inLongTopic;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getConsumedDataSize() {
        return 0L;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getAckedOffset() {
        return 0L;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void seek(long j) throws Exception {
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void seek(MessageId messageId) throws PulsarClientException {
        if (this.consumer != null) {
            this.consumer.seek(messageId);
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public String getFetcherType() {
        return this.inLongTopic.getTopicType();
    }

    private void ackSucc(String str) {
        this.logger.info("ack succ:{}", str);
        this.offsetCache.remove(str);
        this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckSuccTimes(1);
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void ack(String str) throws Exception {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        this.logger.debug("## ack {}", str);
        try {
            if (this.consumer == null) {
                this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1);
                this.logger.error("consumer == null {}", str);
                return;
            }
            MessageId messageId = this.offsetCache.get(str);
            if (messageId != null) {
                this.consumer.acknowledgeAsync(messageId).thenAccept(r5 -> {
                    ackSucc(str);
                }).exceptionally(th -> {
                    this.logger.error("ack fail:{}", str);
                    this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1);
                    return null;
                });
            } else {
                this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1);
                this.logger.error("messageId == null {}", str);
            }
        } catch (Exception e) {
            this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1);
            this.logger.error(e.getMessage(), e);
            throw e;
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean init(PulsarClient pulsarClient) {
        return createConsumer(pulsarClient);
    }

    private boolean createConsumer(PulsarClient pulsarClient) {
        try {
            this.consumer = pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{this.inLongTopic.getTopic()}).subscriptionName(this.context.getConfig().getSortTaskId()).subscriptionType(SubscriptionType.Shared).startMessageIdInclusive().ackTimeout(this.context.getConfig().getAckTimeoutSec(), TimeUnit.SECONDS).receiverQueueSize(this.context.getConfig().getPulsarReceiveQueueSize()).messageListener(new PulsarMessageListener(this, this.context, this.inLongTopic, this.offsetCache)).subscribe();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public void isValidState() {
        if (this.closed) {
            throw new IllegalStateException(this.inLongTopic + " closed.");
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void pause() {
        if (this.consumer != null) {
            this.consumer.pause();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void resume() {
        if (this.consumer != null) {
            this.consumer.resume();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean close() {
        this.mainLock.writeLock().lock();
        try {
            this.closed = true;
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
            this.logger.info("{} closed.", this.inLongTopic);
            return true;
        } finally {
            this.mainLock.writeLock().unlock();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isClosed() {
        return this.closed;
    }
}
