package org.apache.inlong.sdk.sort.impl.pulsar;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.class */
public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
    private final Logger logger;
    private final ReentrantReadWriteLock mainLock;
    private final ConcurrentHashMap<String, MessageId> offsetCache;
    private Consumer<byte[]> consumer;
    private volatile Thread fetchThread;

    /* loaded from: input_file:org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl$Fetcher.class */
    public class Fetcher implements Runnable {
        public Fetcher() {
        }

        private void handleAndCallbackMsg(List<MessageRecord> list) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addCallbackTimes(1L);
                InLongPulsarFetcherImpl.this.context.getConfig().getCallback().onFinishedBatch(list);
                InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addCallbackTimeCost(System.currentTimeMillis() - currentTimeMillis).addCallbackDoneTimes(1L);
            } catch (Exception e) {
                InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addCallbackErrorTimes(1L);
                e.printStackTrace();
            }
        }

        private String getOffset(MessageId messageId) {
            return Base64.getEncoder().encodeToString(messageId.toByteArray());
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    try {
                        if (InLongPulsarFetcherImpl.this.context.getConfig().isStopConsume() || InLongPulsarFetcherImpl.this.isStopConsume) {
                            TimeUnit.MILLISECONDS.sleep(50L);
                            if (0 != 0) {
                                InLongPulsarFetcherImpl.this.context.releaseRequestPermit();
                            }
                        } else {
                            if (InLongPulsarFetcherImpl.this.sleepTime > 0) {
                                TimeUnit.MILLISECONDS.sleep(InLongPulsarFetcherImpl.this.sleepTime);
                            }
                            InLongPulsarFetcherImpl.this.context.acquireRequestPermit();
                            InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addMsgCount(1L).addFetchTimes(1L);
                            long currentTimeMillis = System.currentTimeMillis();
                            Messages<Message> batchReceive = InLongPulsarFetcherImpl.this.consumer.batchReceive();
                            InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addFetchTimeCost(System.currentTimeMillis() - currentTimeMillis);
                            if (null == batchReceive || batchReceive.size() == 0) {
                                InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addEmptyFetchTimes(1L);
                                InLongPulsarFetcherImpl.access$4208(InLongPulsarFetcherImpl.this);
                                if (InLongPulsarFetcherImpl.this.emptyFetchTimes >= InLongPulsarFetcherImpl.this.context.getConfig().getEmptyPollTimes()) {
                                    InLongPulsarFetcherImpl.access$4502(InLongPulsarFetcherImpl.this, Math.min(InLongPulsarFetcherImpl.access$4702(InLongPulsarFetcherImpl.this, InLongPulsarFetcherImpl.this.sleepTime + InLongPulsarFetcherImpl.this.context.getConfig().getEmptyPollSleepStepMs()), InLongPulsarFetcherImpl.this.context.getConfig().getMaxEmptyPollSleepMs()));
                                    InLongPulsarFetcherImpl.this.emptyFetchTimes = 0;
                                }
                            } else {
                                ArrayList arrayList = new ArrayList();
                                for (Message message : batchReceive) {
                                    String offset = getOffset(message.getMessageId());
                                    InLongPulsarFetcherImpl.this.offsetCache.put(offset, message.getMessageId());
                                    arrayList.add(new MessageRecord(InLongPulsarFetcherImpl.this.inLongTopic.getTopicKey(), Collections.singletonList(new InLongMessage(message.getData(), message.getProperties())), offset, System.currentTimeMillis()));
                                    InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addConsumeSize(message.getData().length);
                                }
                                InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addMsgCount(arrayList.size());
                                handleAndCallbackMsg(arrayList);
                                InLongPulsarFetcherImpl.access$3702(InLongPulsarFetcherImpl.this, 0L);
                            }
                            if (1 != 0) {
                                InLongPulsarFetcherImpl.this.context.releaseRequestPermit();
                            }
                        }
                    } catch (Exception e) {
                        InLongPulsarFetcherImpl.this.context.getStatManager().getStatistics(InLongPulsarFetcherImpl.this.context.getConfig().getSortTaskId(), InLongPulsarFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongPulsarFetcherImpl.this.inLongTopic.getTopic()).addFetchErrorTimes(1L);
                        InLongPulsarFetcherImpl.this.logger.error(e.getMessage(), e);
                        if (0 != 0) {
                            InLongPulsarFetcherImpl.this.context.releaseRequestPermit();
                        }
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        InLongPulsarFetcherImpl.this.context.releaseRequestPermit();
                    }
                    throw th;
                }
            }
        }
    }

    public InLongPulsarFetcherImpl(InLongTopic inLongTopic, ClientContext clientContext) {
        super(inLongTopic, clientContext);
        this.logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
        this.mainLock = new ReentrantReadWriteLock(true);
        this.offsetCache = new ConcurrentHashMap<>();
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void stopConsume(boolean z) {
        this.isStopConsume = z;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isConsumeStop() {
        return this.isStopConsume;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public InLongTopic getInLongTopic() {
        return this.inLongTopic;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getConsumedDataSize() {
        return 0L;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getAckedOffset() {
        return 0L;
    }

    private void ackSucc(String str) {
        this.offsetCache.remove(str);
        this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckSuccTimes(1L);
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void ack(String str) throws Exception {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        try {
            if (this.consumer == null) {
                this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1L);
                this.logger.error("consumer == null");
                return;
            }
            MessageId messageId = this.offsetCache.get(str);
            if (messageId != null) {
                this.consumer.acknowledgeAsync(messageId).thenAccept(r5 -> {
                    ackSucc(str);
                }).exceptionally(th -> {
                    this.logger.error("ack fail:{}", str);
                    this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1L);
                    return null;
                });
            } else {
                this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1L);
                this.logger.error("messageId == null");
            }
        } catch (Exception e) {
            this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addAckFailTimes(1L);
            this.logger.error(e.getMessage(), e);
            throw e;
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean init(Object obj) {
        return createConsumer((PulsarClient) obj);
    }

    private boolean createConsumer(PulsarClient pulsarClient) {
        try {
            this.consumer = pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{this.inLongTopic.getTopic()}).subscriptionName(this.context.getConfig().getSortTaskId()).subscriptionType(SubscriptionType.Shared).startMessageIdInclusive().ackTimeout(10L, TimeUnit.SECONDS).receiverQueueSize(this.context.getConfig().getPulsarReceiveQueueSize()).subscribe();
            this.fetchThread = new Thread(new Fetcher(), "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
            this.fetchThread.start();
            return true;
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
            return false;
        }
    }

    public void isValidState() {
        if (this.closed) {
            throw new IllegalStateException(this.inLongTopic + " closed.");
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void pause() {
        if (this.consumer != null) {
            this.consumer.pause();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void resume() {
        if (this.consumer != null) {
            this.consumer.resume();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean close() {
        this.mainLock.writeLock().lock();
        try {
            this.closed = true;
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                if (this.fetchThread != null) {
                    this.fetchThread.interrupt();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
            this.logger.info("closed {}", this.inLongTopic);
            return true;
        } finally {
            this.mainLock.writeLock().unlock();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isClosed() {
        return this.closed;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$3702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$3702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$3702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long");
    }

    static /* synthetic */ int access$4208(InLongPulsarFetcherImpl inLongPulsarFetcherImpl) {
        int i = inLongPulsarFetcherImpl.emptyFetchTimes;
        inLongPulsarFetcherImpl.emptyFetchTimes = i + 1;
        return i;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$4502(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$4502(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$4502(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$4702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$4702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl.access$4702(org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl, long):long");
    }
}
