package org.apache.rocketmq.streams.core.running;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.DataProcessThrowable;
import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.function.supplier.SourceSupplier;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.state.RocketMQStore;
import org.apache.rocketmq.streams.core.state.RocksDBStore;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.RocketMQUtil;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.TimeType;
import org.apache.rocketmq.streams.core.window.fire.IdleWindowScaner;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/running/WorkerThread.class */
public class WorkerThread extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(WorkerThread.class.getName());
    private final TopologyBuilder topologyBuilder;
    private final PlanetaryEngine<?, ?> planetaryEngine;
    private final Properties properties;
    private final String jobId;
    private final ScheduledExecutorService executor;

    /* loaded from: input_file:org/apache/rocketmq/streams/core/running/WorkerThread$PlanetaryEngine.class */
    class PlanetaryEngine<K, V> {
        private final DefaultLitePullConsumer unionConsumer;
        private final DefaultMQProducer producer;
        private final DefaultMQAdminExt mqAdmin;
        private final StateStore stateStore;
        private final MessageQueueListenerWrapper wrapper;
        private final IdleWindowScaner idleWindowScaner;
        private volatile boolean stop = false;
        private long lastCommit = 0;
        private int commitInterval = 10000;
        private final HashSet<MessageQueue> mq2Commit = new HashSet<>();

        public PlanetaryEngine(DefaultLitePullConsumer defaultLitePullConsumer, DefaultMQProducer defaultMQProducer, StateStore stateStore, DefaultMQAdminExt defaultMQAdminExt, MessageQueueListenerWrapper messageQueueListenerWrapper) {
            this.unionConsumer = defaultLitePullConsumer;
            this.producer = defaultMQProducer;
            this.mqAdmin = defaultMQAdminExt;
            this.stateStore = stateStore;
            this.wrapper = messageQueueListenerWrapper;
            this.wrapper.setRecoverHandler((set, set2) -> {
                try {
                    this.stateStore.recover(set, set2);
                    return null;
                } catch (Throwable th) {
                    WorkerThread.logger.error("recover error.", th);
                    return th;
                }
            });
            this.idleWindowScaner = new IdleWindowScaner((Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.IDLE_TIME_TO_FIRE_WINDOW, 2000), WorkerThread.this.executor);
            WorkerThread.this.executor.scheduleAtFixedRate(() -> {
                try {
                    doCommit(this.mq2Commit);
                } catch (Throwable th) {
                    WorkerThread.logger.error("commit offset and state error.", th);
                }
            }, 10L, 10L, TimeUnit.SECONDS);
        }

        void start() throws Throwable {
            createShuffleTopic();
            this.unionConsumer.start();
            this.producer.start();
            this.stateStore.init();
        }

        void runInLoop() throws Throwable {
            Object orDefault;
            Boolean bool;
            MessageExt messageExt;
            byte[] body;
            while (!this.stop) {
                try {
                    Iterator it = this.unionConsumer.poll(10L).iterator();
                    while (it.hasNext() && (body = (messageExt = (MessageExt) it.next()).getBody()) != null && body.length != 0) {
                        String userProperty = messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
                        String userProperty2 = messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
                        String topic = messageExt.getTopic();
                        int queueId = messageExt.getQueueId();
                        String brokerName = messageExt.getBrokerName();
                        MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
                        this.mq2Commit.add(messageQueue);
                        WorkerThread.logger.debug("source topic queue:[{}]", messageQueue);
                        String buildKey = Utils.buildKey(brokerName, topic, queueId);
                        SourceSupplier.SourceProcessor<K, V> sourceProcessor = (SourceSupplier.SourceProcessor) this.wrapper.selectProcessor(buildKey);
                        StreamContextImpl streamContextImpl = new StreamContextImpl(WorkerThread.this.properties, this.producer, this.mqAdmin, this.stateStore, buildKey, this.idleWindowScaner);
                        sourceProcessor.preProcess(streamContextImpl);
                        Pair<K, V> deserialize = sourceProcessor.deserialize(userProperty, userProperty2, body);
                        Data<K, V> data = new Data<>(deserialize.getKey(), deserialize.getValue(), Long.valueOf(prepareTime(messageExt, sourceProcessor)), new Properties());
                        streamContextImpl.setKey(deserialize.getKey());
                        if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                            WorkerThread.logger.debug("shuffle data: [{}]", data);
                        } else {
                            WorkerThread.logger.debug("source data: [{}]", data);
                        }
                        try {
                            streamContextImpl.forward(data);
                        } catch (Throwable th) {
                            WorkerThread.logger.error("process error.", th);
                            throw new DataProcessThrowable(th);
                            break;
                        }
                    }
                } finally {
                    if (orDefault == bool) {
                    }
                }
            }
        }

        void doCommit(HashSet<MessageQueue> hashSet) throws Throwable {
            if (System.currentTimeMillis() - this.lastCommit <= this.commitInterval || hashSet.size() == 0) {
                return;
            }
            this.stateStore.persist(hashSet);
            this.unionConsumer.commit(hashSet, true);
            Iterator<MessageQueue> it = hashSet.iterator();
            while (it.hasNext()) {
                WorkerThread.logger.debug("committed messageQueue: [{}]", it.next());
            }
            this.lastCommit = System.currentTimeMillis();
            hashSet.clear();
        }

        long prepareTime(MessageExt messageExt, SourceSupplier.SourceProcessor<K, V> sourceProcessor) {
            TimeType timeType = (TimeType) WorkerThread.this.properties.get(StreamConfig.TIME_TYPE);
            String userProperty = messageExt.getUserProperty(Constant.SOURCE_TIMESTAMP);
            return !StringUtils.isEmpty(userProperty) ? Long.parseLong(userProperty) : sourceProcessor.getTimestamp(messageExt, timeType);
        }

        void createShuffleTopic() throws Throwable {
            Set<String> sourceTopic = WorkerThread.this.topologyBuilder.getSourceTopic();
            ArrayList arrayList = new ArrayList();
            for (String str : sourceTopic) {
                if (str.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                    arrayList.add(str);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                RocketMQUtil.createStaticTopic(this.mqAdmin, (String) it.next(), StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM.intValue());
            }
        }

        public synchronized void stop() {
            if (this.stop) {
                return;
            }
            this.stop = true;
            try {
                this.unionConsumer.shutdown();
                this.stateStore.close();
                this.idleWindowScaner.close();
                this.producer.shutdown();
                this.mqAdmin.shutdown();
                WorkerThread.logger.info("shutdown engine success, thread:{}, jobId:{}", WorkerThread.this.getName(), WorkerThread.this.jobId);
            } catch (Throwable th) {
                WorkerThread.logger.error("error when stop engin.", th);
                throw new RStreamsException(th);
            }
        }
    }

    public WorkerThread(String str, TopologyBuilder topologyBuilder, Properties properties, ScheduledExecutorService scheduledExecutorService) throws MQClientException {
        super(str);
        this.topologyBuilder = topologyBuilder;
        this.properties = properties;
        this.jobId = topologyBuilder.getJobId();
        this.executor = scheduledExecutorService;
        String join = String.join("_", this.jobId, StreamConfig.ROCKETMQ_STREAMS_CONSUMER_GROUP);
        RocketMQClient rocketMQClient = new RocketMQClient(properties.getProperty("rocketmq.namesrv.addr"));
        DefaultLitePullConsumer pullConsumer = rocketMQClient.pullConsumer(join, topologyBuilder.getSourceTopic());
        MessageQueueListenerWrapper messageQueueListenerWrapper = new MessageQueueListenerWrapper(pullConsumer.getMessageQueueListener(), topologyBuilder);
        pullConsumer.setMessageQueueListener(messageQueueListenerWrapper);
        DefaultMQProducer producer = rocketMQClient.producer(join);
        DefaultMQAdminExt mQAdmin = rocketMQClient.getMQAdmin();
        this.planetaryEngine = new PlanetaryEngine<>(pullConsumer, producer, new RocketMQStore(producer, new RocksDBStore(str), mQAdmin, this.properties), mQAdmin, messageQueueListenerWrapper);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        RStreamsException rStreamsException;
        try {
            try {
                this.planetaryEngine.start();
                logger.info("worker thread=[{}], start task success, jobId:{}", getName(), this.jobId);
                this.planetaryEngine.runInLoop();
                this.planetaryEngine.stop();
            } finally {
            }
        } catch (Throwable th) {
            this.planetaryEngine.stop();
            throw th;
        }
    }

    public void shutdown() {
        this.planetaryEngine.stop();
    }
}
