package org.apache.rocketmq.streams.common.topology.task;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintMetric;
import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.topology.model.PipelineSourceJoiner;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/topology/task/StreamsTask.class */
public class StreamsTask extends BasedConfigurable implements IStreamOperator<IMessage, AbstractContext>, IAfterConfigurableRefreshListener {
    private static final Log LOG = LogFactory.getLog(StreamsTask.class);
    public static final String TYPE = "stream_task";

    @ENVDependence
    protected String logFingerprint;
    protected transient FingerprintCache homologousRulesCache;
    protected transient IHomologousOptimization homologousOptimization;
    protected transient ExecutorService executorService;
    protected int homologousRulesCaseSize = 2000000;
    protected int homologousExpressionCaseSize = 2000000;
    protected int preFingerprintCaseSize = 2000000;
    protected int parallelTasks = 4;
    protected transient List<ChainPipeline> pipelines = new ArrayList();
    protected List<String> pipelineNames = new ArrayList();
    private final transient AtomicLong COUNT = new AtomicLong(0);
    private final transient AtomicLong FIRE_RULE_COUNT = new AtomicLong(0);
    protected transient Long firstReceiveTime = null;
    protected transient boolean isContainsKeyBy = false;
    protected transient Boolean isOpenOptimization = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/streams/common/topology/task/StreamsTask$HomologousTask.class */
    public class HomologousTask implements Runnable {
        protected IMessage message;
        protected AbstractContext context;
        protected ChainPipeline pipeline;
        protected BitSetCache.BitSet bitSet;
        protected int index;
        protected String msgKey;
        protected CountDownLatch countDownLatch;

        public HomologousTask(IMessage iMessage, AbstractContext abstractContext, ChainPipeline chainPipeline, BitSetCache.BitSet bitSet, int i, String str) {
            this.message = iMessage;
            this.context = abstractContext;
            this.pipeline = chainPipeline;
            this.bitSet = bitSet;
            this.index = i;
            this.msgKey = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.pipeline.doMessage((ChainPipeline) this.message, this.context);
                    Boolean existFinishBranch = MessageGlobleTrace.existFinishBranch(this.message);
                    if (existFinishBranch != null && !existFinishBranch.booleanValue() && this.bitSet != null) {
                        this.bitSet.set(this.index);
                    }
                    Boolean existFinishBranch2 = MessageGlobleTrace.existFinishBranch(this.message);
                    if (existFinishBranch2 != null && existFinishBranch2.booleanValue()) {
                        StreamsTask.this.FIRE_RULE_COUNT.incrementAndGet();
                    }
                    if (this.countDownLatch != null) {
                        this.countDownLatch.countDown();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    StreamsTask.LOG.error("pipeline execute error " + this.pipeline.getConfigureName(), e);
                    if (this.countDownLatch != null) {
                        this.countDownLatch.countDown();
                    }
                }
            } catch (Throwable th) {
                if (this.countDownLatch != null) {
                    this.countDownLatch.countDown();
                }
                throw th;
            }
        }

        public CountDownLatch getCountDownLatch() {
            return this.countDownLatch;
        }

        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
    }

    public void start() {
        HashMap hashMap = new HashMap();
        while (true) {
            try {
                for (ChainPipeline chainPipeline : this.pipelines) {
                    if (!hashMap.containsKey(chainPipeline.getConfigureName())) {
                        startPipeline(chainPipeline);
                        hashMap.put(chainPipeline.getConfigureName(), true);
                    }
                }
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.IBaseStreamOperator
    public AbstractContext doMessage(IMessage iMessage, AbstractContext abstractContext) {
        if (CollectionUtil.isEmpty(this.pipelines)) {
            return abstractContext;
        }
        if (this.homologousOptimization == null && this.isOpenOptimization.booleanValue()) {
            synchronized (this) {
                if (this.homologousOptimization == null) {
                    String property = ComponentCreator.getProperties().getProperty("homologous.optimization.switch");
                    boolean booleanValue = StringUtil.isNotEmpty(property) ? Boolean.valueOf(property).booleanValue() : true;
                    this.isOpenOptimization = Boolean.valueOf(booleanValue);
                    if (booleanValue) {
                        Iterator it = ServiceLoader.load(IHomologousOptimization.class).iterator();
                        if (it.hasNext()) {
                            this.homologousOptimization = (IHomologousOptimization) it.next();
                            this.homologousOptimization.optimizate(this.pipelines, this.homologousExpressionCaseSize, this.preFingerprintCaseSize);
                        }
                    }
                }
            }
        }
        double calculateQPS = calculateQPS();
        if (StringUtil.isEmpty(this.logFingerprint) && this.COUNT.get() % 10000 == 0) {
            System.out.println(getConfigureName() + " qps is " + calculateQPS + "。the count is " + this.COUNT.get());
        }
        if (this.homologousOptimization != null) {
            this.homologousOptimization.calculate(iMessage, abstractContext);
        }
        boolean z = this.pipelines.size() == 1;
        String filterKey = getFilterKey(iMessage);
        BitSetCache.BitSet filterValue = getFilterValue(filterKey);
        boolean z2 = true;
        CountDownLatch countDownLatch = null;
        if (filterValue == null && StringUtil.isNotEmpty(this.logFingerprint)) {
            filterValue = new BitSetCache.BitSet(this.pipelines.size());
            z2 = false;
            if (!this.isContainsKeyBy && this.pipelines.size() > 1) {
                countDownLatch = new CountDownLatch(this.pipelines.size());
            }
        }
        int i = 0;
        for (ChainPipeline chainPipeline : this.pipelines) {
            if (z2 && filterValue != null && filterValue.get(i)) {
                i++;
            } else {
                IMessage iMessage2 = iMessage;
                if (!z) {
                    iMessage2 = iMessage.deepCopy();
                }
                Context context = new Context(iMessage2);
                context.setHomologousResult(abstractContext.getHomologousResult());
                context.setQuickFilterResult(abstractContext.getQuickFilterResult());
                HomologousTask homologousTask = new HomologousTask(iMessage2, context, chainPipeline, filterValue, i, filterKey);
                if (this.executorService != null) {
                    if (countDownLatch != null) {
                        homologousTask.setCountDownLatch(countDownLatch);
                    }
                    this.executorService.execute(homologousTask);
                } else {
                    homologousTask.run();
                }
                i++;
            }
        }
        if (countDownLatch != null) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if (!z2) {
            addNoFireMessage(filterKey, filterValue);
        }
        if (!StringUtil.isNotEmpty(this.logFingerprint)) {
            return null;
        }
        printQPSWithFingerprint(calculateQPS);
        return null;
    }

    @Override // org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener
    public void doProcessAfterRefreshConfigurable(IConfigurableService iConfigurableService) {
        List<String> pipelineNames;
        List<TaskAssigner> queryConfigurableByType = iConfigurableService.queryConfigurableByType(TaskAssigner.TYPE);
        if (queryConfigurableByType == null) {
            return;
        }
        boolean z = false;
        ArrayList arrayList = new ArrayList();
        for (TaskAssigner taskAssigner : queryConfigurableByType) {
            if (getConfigureName().equals(taskAssigner.getTaskName()) && (pipelineNames = taskAssigner.getPipelineNames()) != null) {
                Iterator<String> it = pipelineNames.iterator();
                while (it.hasNext()) {
                    ChainPipeline chainPipeline = (ChainPipeline) iConfigurableService.queryConfigurable(Pipeline.TYPE, it.next());
                    if (chainPipeline != null) {
                        arrayList.add(chainPipeline);
                    }
                }
            }
        }
        loadSubPipelines(arrayList, iConfigurableService);
        ArrayList arrayList2 = new ArrayList();
        if (arrayList.size() > 0) {
            Iterator<ChainPipeline> it2 = arrayList.iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (!this.pipelines.contains(it2.next())) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            for (ChainPipeline chainPipeline2 : this.pipelines) {
                if (!arrayList.contains(chainPipeline2)) {
                    z = true;
                    arrayList2.add(chainPipeline2);
                }
            }
        }
        if (z) {
            this.pipelines = arrayList;
            this.homologousRulesCache = new FingerprintCache(this.homologousRulesCaseSize);
            Iterator it3 = arrayList2.iterator();
            while (it3.hasNext()) {
                ((ChainPipeline) it3.next()).destroy();
            }
        }
        if (this.parallelTasks > 0) {
            this.executorService = ThreadPoolFactory.createThreadPool(this.parallelTasks);
        }
    }

    protected void loadSubPipelines(List<ChainPipeline> list, IConfigurableService iConfigurableService) {
        ChainPipeline chainPipeline;
        List<PipelineSourceJoiner> queryConfigurableByType = iConfigurableService.queryConfigurableByType(PipelineSourceJoiner.TYPE);
        if (queryConfigurableByType == null) {
            return;
        }
        String configureName = getConfigureName();
        for (PipelineSourceJoiner pipelineSourceJoiner : queryConfigurableByType) {
            if (configureName.equals(pipelineSourceJoiner.getSourcePipelineName()) && (chainPipeline = (ChainPipeline) iConfigurableService.queryConfigurable(Pipeline.TYPE, pipelineSourceJoiner.getPipelineName())) != null) {
                list.add(chainPipeline);
            }
        }
    }

    protected void addNoFireMessage(String str, BitSetCache.BitSet bitSet) {
        if (StringUtil.isEmpty(this.logFingerprint)) {
            return;
        }
        this.homologousRulesCache.addLogFingerprint(getOrCreateFingerNameSpace(), str, bitSet);
    }

    protected BitSetCache.BitSet getFilterValue(String str) {
        if (StringUtil.isEmpty(this.logFingerprint)) {
            return null;
        }
        return this.homologousRulesCache.getLogFingerprint(getOrCreateFingerNameSpace(), str);
    }

    protected String getFilterKey(IMessage iMessage) {
        if (StringUtil.isEmpty(this.logFingerprint)) {
            return null;
        }
        return FingerprintCache.creatFingerpringKey(iMessage, getOrCreateFingerNameSpace(), this.logFingerprint);
    }

    protected String getOrCreateFingerNameSpace() {
        return getConfigureName();
    }

    protected double calculateQPS() {
        if (this.firstReceiveTime == null) {
            synchronized (this) {
                if (this.firstReceiveTime == null) {
                    this.firstReceiveTime = Long.valueOf(System.currentTimeMillis());
                }
            }
        }
        long currentTimeMillis = (System.currentTimeMillis() - this.firstReceiveTime.longValue()) / 1000;
        if (currentTimeMillis == 0) {
            currentTimeMillis = 1;
        }
        return this.COUNT.incrementAndGet() / currentTimeMillis;
    }

    protected void printQPSWithFingerprint(double d) {
        FingerprintMetric orCreateMetric = this.homologousRulesCache.getOrCreateMetric(getOrCreateFingerNameSpace());
        double hitCacheRate = orCreateMetric.getHitCacheRate();
        double d2 = this.FIRE_RULE_COUNT.get() / this.COUNT.get();
        if (this.COUNT.get() % 1000 == 0) {
            System.out.println("qps is " + d + ",the count is " + this.COUNT.get() + " the cache hit  rate " + hitCacheRate + " the cache size is " + orCreateMetric.getCacheSize() + "，the fire rule rate is " + d2);
        }
    }

    protected void startPipeline(final ChainPipeline chainPipeline) {
        new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.common.topology.task.StreamsTask.1
            @Override // java.lang.Runnable
            public void run() {
                chainPipeline.startChannel();
            }
        }).start();
    }

    public int getParallelTasks() {
        return this.parallelTasks;
    }

    public void setParallelTasks(int i) {
        this.parallelTasks = i;
    }

    public List<ChainPipeline> getPipelines() {
        return this.pipelines;
    }

    public void setPipelines(List<ChainPipeline> list) {
        this.pipelines = list;
        Iterator<ChainPipeline> it = this.pipelines.iterator();
        while (it.hasNext()) {
            this.pipelineNames.add(it.next().getConfigureName());
        }
    }

    public String getLogFingerprint() {
        return this.logFingerprint;
    }

    public void setLogFingerprint(String str) {
        this.logFingerprint = str;
    }

    public int getHomologousRulesCaseSize() {
        return this.homologousRulesCaseSize;
    }

    public void setHomologousRulesCaseSize(int i) {
        this.homologousRulesCaseSize = i;
    }

    public int getHomologousExpressionCaseSize() {
        return this.homologousExpressionCaseSize;
    }

    public void setHomologousExpressionCaseSize(int i) {
        this.homologousExpressionCaseSize = i;
    }

    public int getPreFingerprintCaseSize() {
        return this.preFingerprintCaseSize;
    }

    public void setPreFingerprintCaseSize(int i) {
        this.preFingerprintCaseSize = i;
    }

    public List<String> getPipelineNames() {
        return this.pipelineNames;
    }

    public void setPipelineNames(List<String> list) {
        this.pipelineNames = list;
    }
}
