package org.apache.rocketmq.streams.common.topology;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.streams.common.cache.compress.impl.LongValueKV;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.monitor.IMonitor;
import org.apache.rocketmq.streams.common.monitor.group.MonitorCommander;
import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
import org.apache.rocketmq.streams.common.optimization.fingerprint.PreFingerprint;
import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.utils.DipperThreadLocalUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/topology/ChainPipeline.class */
public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IAfterConfigurableRefreshListener, Serializable {
    private static final long serialVersionUID = -5189371682717444347L;
    private transient LongValueKV duplicateCache;
    private transient List<String> duplicateFields;
    private transient int duplicateCacheExpirationTime;
    private transient IHomologousOptimization homologousOptimization;
    protected Integer pipelineStatus;
    protected transient ISource source;
    protected List<String> channelNextStageLabel;
    private String channelName;
    protected MetaData channelMetaData;
    private final transient int duplicateCacheSize = 1000000;
    private transient int homologousExpressionCacheSize = 2000000;
    private transient int preFingerprintCacheSize = 2000000;
    protected boolean isAutoStart = false;
    protected transient Map<String, AbstractStage> stageMap = new HashMap();
    private boolean isPublish = false;
    protected transient AtomicBoolean hasStart = new AtomicBoolean(false);

    public Integer getPipelineStatus() {
        return this.pipelineStatus;
    }

    public void setPipelineStatus(Integer num) {
        this.pipelineStatus = num;
    }

    public void startChannel() {
        String createPipelineMonitorName = createPipelineMonitorName();
        if (!isInitSuccess()) {
            LOG.error("channel init failure, so can not start channel");
            return;
        }
        if (this.hasStart.compareAndSet(false, true)) {
            IMonitor iMonitor = (IMonitor) DipperThreadLocalUtil.get();
            if (iMonitor == null) {
                iMonitor = IMonitor.createMonitor(this);
            }
            String property = ComponentCreator.getProperties().getProperty("homologous.optimization.switch");
            boolean z = true;
            if (StringUtil.isNotEmpty(property)) {
                z = Boolean.valueOf(property).booleanValue();
            }
            if (this.homologousOptimization == null && z) {
                Iterator it = ServiceLoader.load(IHomologousOptimization.class).iterator();
                String property2 = ComponentCreator.getProperties().getProperty("homologous.expression.cache.size");
                if (StringUtil.isNotEmpty(property2)) {
                    this.homologousExpressionCacheSize = Integer.valueOf(property2).intValue();
                }
                String property3 = ComponentCreator.getProperties().getProperty("homologous.pre.fingerprint.cache.size");
                if (StringUtil.isNotEmpty(property3)) {
                    this.preFingerprintCacheSize = Integer.valueOf(property3).intValue();
                }
                if (it.hasNext()) {
                    this.homologousOptimization = (IHomologousOptimization) it.next();
                    this.homologousOptimization.optimizate(Lists.newArrayList(new ChainPipeline[]{this}), this.homologousExpressionCacheSize, this.preFingerprintCacheSize);
                }
            }
            try {
                AtomicLong atomicLong = new AtomicLong(0L);
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                Boolean valueOf2 = Boolean.valueOf(ComponentCreator.getPropertyBooleanValue("pipeline.qps.print"));
                this.source.start((iMessage, abstractContext) -> {
                    IMonitor startMonitor = abstractContext.startMonitor(createPipelineMonitorName);
                    startMonitor.setType(IMonitor.TYPE_DATAPROCESS);
                    iMessage.getHeader().setPiplineName(getConfigureName());
                    if (this.homologousOptimization != null) {
                        this.homologousOptimization.calculate(iMessage, abstractContext);
                    }
                    IMessage iMessage = (IMessage) this.doMessage(iMessage, abstractContext);
                    startMonitor.endMonitor();
                    if (valueOf2.booleanValue()) {
                        long incrementAndGet = atomicLong.incrementAndGet();
                        long currentTimeMillis = (System.currentTimeMillis() - valueOf.longValue()) / 1000;
                        if (currentTimeMillis == 0) {
                            currentTimeMillis = 1;
                        }
                        if (incrementAndGet % 1000 == 0) {
                            System.out.println("qps is " + (incrementAndGet / currentTimeMillis) + ",the count is " + atomicLong.get());
                        }
                    }
                    MonitorCommander.getInstance().finishMonitor(startMonitor.getName(), startMonitor);
                    return iMessage;
                });
            } catch (Exception e) {
                e.printStackTrace();
                setInitSuccess(false);
                iMonitor.occureError(e, iMonitor.getName() + " pipeline startup error", e.getMessage());
            }
        }
    }

    private String createDuplicateKey(IMessage iMessage) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = this.duplicateFields.iterator();
        while (it.hasNext()) {
            newArrayList.add(iMessage.getMessageBody().getString(it.next()));
        }
        return StringUtil.createMD5Str(String.join("", newArrayList));
    }

    private String createPipelineMonitorName() {
        return MapKeyUtil.createKeyBySign(".", getType(), getNameSpace(), getConfigureName());
    }

    @Override // org.apache.rocketmq.streams.common.topology.model.Pipeline
    protected T doMessageInner(T t, AbstractContext abstractContext, AbstractStage... abstractStageArr) {
        if (this.duplicateCache != null && this.duplicateFields != null && !this.duplicateFields.isEmpty() && !t.getHeader().isSystemMessage()) {
            String createDuplicateKey = createDuplicateKey(t);
            Long l = this.duplicateCache.get(createDuplicateKey);
            Long valueOf = Long.valueOf(System.currentTimeMillis());
            if (l != null && valueOf.longValue() - l.longValue() < this.duplicateCacheExpirationTime) {
                abstractContext.breakExecute();
                return t;
            }
            this.duplicateCache.put(createDuplicateKey, valueOf);
            if (this.duplicateCache.getSize() > 1000000) {
                getClass();
                this.duplicateCache = new LongValueKV(1000000);
            }
        }
        if (!t.getHeader().isSystemMessage()) {
            MessageGlobleTrace.joinMessage(t);
        }
        if (!isTopology()) {
            return (T) super.doMessageInner(t, abstractContext, abstractStageArr);
        }
        abstractContext.setMessage(t);
        doNextStages(abstractContext, getMsgSourceName(), this.channelName, this.channelNextStageLabel, null);
        return t;
    }

    protected boolean isTopology(List<String> list) {
        return (list == null || list.size() == 0) ? false : true;
    }

    public boolean isTopology() {
        return isTopology(this.channelNextStageLabel);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void doNextStages(AbstractContext abstractContext, String str, String str2, List<String> list, String str3) {
        if (isTopology(list)) {
            int size = list.size();
            for (String str4 : list) {
                AbstractContext abstractContext2 = abstractContext;
                if (size > 1) {
                    abstractContext2 = abstractContext.copy();
                }
                IMessage message = abstractContext2.getMessage();
                AbstractStage abstractStage = this.stageMap.get(str4);
                if (abstractStage == null) {
                    if (this.stages != null && this.stages.size() > 0) {
                        synchronized (this) {
                            abstractStage = this.stageMap.get(str4);
                            if (abstractStage == null) {
                                createStageMap();
                                abstractStage = this.stageMap.get(str4);
                            }
                        }
                    }
                    if (abstractStage == null) {
                        LOG.warn("expect stage named " + str4 + ", but the stage is not exist");
                    }
                }
                AbstractStage abstractStage2 = abstractStage;
                if (!filterByPreFingerprint(message, abstractContext2, str2, str4)) {
                    if (StringUtil.isNotEmpty(str)) {
                        message.getHeader().setMsgRouteFromLable(str);
                    }
                    if (executeStage(abstractStage2, message, abstractContext2)) {
                        if (abstractStage2 instanceof ChainStage) {
                            String msgSourceName = ((ChainStage) abstractStage2).getMsgSourceName();
                            if (StringUtil.isNotEmpty(msgSourceName)) {
                                str = msgSourceName;
                            }
                        }
                        List<String> doRoute = abstractStage2.doRoute(message);
                        if (doRoute != null && doRoute.size() != 0) {
                            doNextStages(abstractContext2, str, abstractStage2.getLabel(), doRoute, abstractStage2.getOwnerSqlNodeTableName());
                        } else if (!message.getHeader().isSystemMessage()) {
                            MessageGlobleTrace.finishPipeline(message);
                        }
                    } else if (abstractStage2.isAsyncNode() && !message.getHeader().isSystemMessage()) {
                        MessageGlobleTrace.finishPipeline(message);
                    }
                }
            }
        }
    }

    protected boolean isNewSQLNode(AbstractStage abstractStage, String str) {
        return str == null || !abstractStage.getOwnerSqlNodeTableName().equals(str);
    }

    @Override // org.apache.rocketmq.streams.common.topology.model.Pipeline
    protected boolean executeStage(AbstractStage abstractStage, T t, AbstractContext abstractContext) {
        IMonitor createChildrenMonitor = abstractContext.createChildrenMonitor(createPipelineMonitorName(), abstractStage);
        try {
            boolean executeStage = super.executeStage(abstractStage, t, abstractContext);
            createChildrenMonitor.setResult(Boolean.valueOf(executeStage));
            createChildrenMonitor.endMonitor();
            if (createChildrenMonitor.isSlow()) {
                createChildrenMonitor.setSampleData(abstractContext).put("stage_info", createStageInfo(abstractStage));
            }
            return executeStage;
        } catch (Exception e) {
            e.printStackTrace();
            createChildrenMonitor.occureError(e, "execute stage error " + abstractStage.getConfigureName(), e.getMessage());
            createChildrenMonitor.setSampleData(abstractContext).put("stage_info", createStageInfo(abstractStage));
            return false;
        }
    }

    protected boolean filterByPreFingerprint(IMessage iMessage, AbstractContext abstractContext, String str, String str2) {
        PreFingerprint preFingerprint = getPreFingerprint(str, str2);
        if (preFingerprint == null || !preFingerprint.filterByLogFingerprint(iMessage)) {
            return false;
        }
        abstractContext.breakExecute();
        return true;
    }

    protected JSONObject createStageInfo(AbstractStage abstractStage) {
        return null;
    }

    public ChainPipeline addChainStage(ChainStage chainStage) {
        addStage(chainStage);
        return this;
    }

    public ISource getSource() {
        return this.source;
    }

    public void setSource(ISource iSource) {
        this.source = iSource;
        if (getNameSpace() == null) {
            setNameSpace(iSource.getNameSpace());
        }
        this.channelName = iSource.getConfigureName();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener
    public void doProcessAfterRefreshConfigurable(IConfigurableService iConfigurableService) {
        createStageMap();
        ISource iSource = (ISource) iConfigurableService.queryConfigurable(ISource.TYPE, this.channelName);
        this.source = iSource;
        for (AbstractStage abstractStage : getStages()) {
            abstractStage.setPipeline(this);
            if (abstractStage instanceof IAfterConfigurableRefreshListener) {
                if (!abstractStage.isInitSuccess() && !isInitSuccess()) {
                    setInitSuccess(false);
                    return;
                }
                ((IAfterConfigurableRefreshListener) abstractStage).doProcessAfterRefreshConfigurable(iConfigurableService);
            }
        }
        if (iSource != this.source && this.source != null) {
            this.hasStart.set(false);
            this.source = iSource;
            startChannel();
        }
        if ((iSource instanceof AbstractConfigurable) && !((AbstractConfigurable) iSource).isInitSuccess() && isInitSuccess()) {
            setInitSuccess(false);
            return;
        }
        if ((this.isAutoStart || isPublish()) && isInitSuccess()) {
            startChannel();
        }
        String property = ComponentCreator.getProperties().getProperty(getConfigureName() + ".duplicate.fields.names");
        if (property != null && !property.isEmpty()) {
            this.duplicateFields = Lists.newArrayList();
            this.duplicateFields.addAll(Arrays.asList(property.split(IWindow.SCRIPT_SPLIT_CHAR)));
        }
        if (this.duplicateCache == null && this.duplicateFields != null) {
            getClass();
            this.duplicateCache = new LongValueKV(1000000);
        }
        String property2 = ComponentCreator.getProperties().getProperty(getConfigureName() + ".duplicate.expiration.time");
        if (property2 == null || property2.isEmpty()) {
            this.duplicateCacheExpirationTime = 86400000;
        } else {
            this.duplicateCacheExpirationTime = Integer.parseInt(property2);
        }
    }

    public Map<String, AbstractStage> createStageMap() {
        for (AbstractStage abstractStage : getStages()) {
            this.stageMap.put(abstractStage.getLabel(), abstractStage);
            abstractStage.setPipeline(this);
        }
        return this.stageMap;
    }

    public boolean isAutoStart() {
        return this.isAutoStart;
    }

    public void setAutoStart(boolean z) {
        this.isAutoStart = z;
    }

    public List<String> getChannelNextStageLabel() {
        return this.channelNextStageLabel;
    }

    public void setChannelNextStageLabel(List<String> list) {
        this.channelNextStageLabel = list;
    }

    public String toString() {
        String str = PrintUtil.LINE;
        StringBuilder sb = new StringBuilder();
        sb.append("###namespace=").append(getNameSpace()).append("###").append(str);
        if (this.source != null) {
            sb.append(this.source.toString()).append(str);
        }
        if (this.stages != null) {
            Iterator<AbstractStage> it = this.stages.iterator();
            while (it.hasNext()) {
                sb.append(it.next().toString());
            }
        }
        return sb.toString();
    }

    @Override // org.apache.rocketmq.streams.common.topology.model.Pipeline, org.apache.rocketmq.streams.common.configurable.AbstractConfigurable, org.apache.rocketmq.streams.common.configurable.IConfigurable
    public void destroy() {
        if (this.source != null) {
            this.source.destroy();
        }
        super.destroy();
    }

    public Map<String, AbstractStage> getStageMap() {
        return this.stageMap;
    }

    public Boolean getHasStart() {
        return Boolean.valueOf(this.hasStart.get());
    }

    public boolean isPublish() {
        return this.isPublish;
    }

    public void setPublish(boolean z) {
        this.isPublish = z;
    }

    public MetaData getChannelMetaData() {
        return this.channelMetaData;
    }

    public void setChannelMetaData(MetaData metaData) {
        this.channelMetaData = metaData;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public void setChannelName(String str) {
        this.channelName = str;
    }
}
