package org.apache.rocketmq.streams.common.topology.model;

import com.alibaba.fastjson.JSONArray;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
import org.apache.rocketmq.streams.common.optimization.fingerprint.PreFingerprint;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/topology/model/Pipeline.class */
public class Pipeline<T extends IMessage> extends BasedConfigurable implements IStreamOperator<T, T> {
    public static final Log LOG = LogFactory.getLog(Pipeline.class);
    public static final String TYPE = "pipeline";
    protected transient String name;
    private String sourceIdentification;
    protected String msgSourceName;
    protected List<AbstractStage> stages = new ArrayList();
    protected transient Map<String, Map<String, PreFingerprint>> preFingerprintExecutor = new HashMap();

    public Pipeline() {
        setType(TYPE);
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.IBaseStreamOperator
    public T doMessage(T t, AbstractContext abstractContext) {
        return doMessage((Pipeline<T>) t, abstractContext, (AbstractStage[]) null);
    }

    public T doMessage(T t, AbstractContext abstractContext, AbstractStage... abstractStageArr) {
        T doMessageInner = doMessageInner(t, abstractContext, abstractStageArr);
        abstractContext.setMessage(doMessageInner);
        return doMessageInner;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public T doMessageInner(T t, AbstractContext abstractContext, AbstractStage... abstractStageArr) {
        return doMessageFromIndex(t, abstractContext, 0, abstractStageArr);
    }

    public T doMessageFromIndex(T t, AbstractContext abstractContext, int i, AbstractStage... abstractStageArr) {
        abstractContext.setMessage(t);
        for (int i2 = i; i2 < this.stages.size(); i2++) {
            AbstractStage chooseReplaceStage = chooseReplaceStage(this.stages.get(i2), abstractStageArr);
            if (!executeStage(chooseReplaceStage, t, abstractContext)) {
                if (chooseReplaceStage.isAsyncNode()) {
                    MessageGlobleTrace.finishPipeline(t);
                }
                return t;
            }
        }
        MessageGlobleTrace.finishPipeline(t);
        return t;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registPreFingerprint(PreFingerprint preFingerprint) {
        if (preFingerprint == null) {
            return;
        }
        Map<String, PreFingerprint> map = this.preFingerprintExecutor.get(preFingerprint.getSourceStageLable());
        if (map == null) {
            map = new HashMap();
            this.preFingerprintExecutor.put(preFingerprint.getSourceStageLable(), map);
        }
        map.put(preFingerprint.getNextStageLable(), preFingerprint);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PreFingerprint getPreFingerprint(String str, String str2) {
        Map<String, PreFingerprint> map = this.preFingerprintExecutor.get(str);
        if (map == null) {
            return null;
        }
        return map.get(str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean executeStage(AbstractStage abstractStage, T t, AbstractContext abstractContext) {
        if (t.getHeader().isSystemMessage()) {
            ISystemMessage systemMessage = t.getSystemMessage();
            if (systemMessage instanceof CheckPointMessage) {
                abstractStage.checkpoint(t, abstractContext, (CheckPointMessage) systemMessage);
            } else if (systemMessage instanceof NewSplitMessage) {
                abstractStage.addNewSplit(t, abstractContext, (NewSplitMessage) systemMessage);
            } else if (systemMessage instanceof RemoveSplitMessage) {
                abstractStage.removeSplit(t, abstractContext, (RemoveSplitMessage) systemMessage);
            } else {
                if (!(systemMessage instanceof BatchFinishMessage)) {
                    if (systemMessage == null) {
                        return true;
                    }
                    throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
                }
                abstractStage.batchMessageFinish(t, abstractContext, (BatchFinishMessage) systemMessage);
            }
            if (!abstractStage.isAsyncNode()) {
                return true;
            }
            abstractContext.breakExecute();
            return false;
        }
        abstractContext.resetIsContinue();
        if (!abstractContext.isSplitModel() || abstractStage.isCloseSplitMode()) {
            if (abstractStage.isCloseSplitMode()) {
                if (StringUtil.isNotEmpty(abstractStage.getSplitDataFieldName())) {
                    t.getMessageBody().put(abstractStage.getSplitDataFieldName(), createJsonArray(abstractContext.getSplitMessages()));
                }
                abstractContext.closeSplitMode(t);
            }
            boolean doMessage = doMessage((Pipeline<T>) t, abstractStage, abstractContext);
            MessageGlobleTrace.clear(t);
            return doMessage;
        }
        List<T> splitMessages = abstractContext.getSplitMessages();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (T t2 : splitMessages) {
            abstractContext.closeSplitMode(t2);
            t2.getHeader().setMsgRouteFromLable(t.getHeader().getMsgRouteFromLable());
            t2.getHeader().addLayerOffset(i);
            i++;
            if (!doMessage((Pipeline<T>) t2, abstractStage, abstractContext)) {
                abstractContext.removeSpliteMessage(t2);
                abstractContext.cancelBreak();
            } else if (abstractContext.isSplitModel()) {
                arrayList.addAll(abstractContext.getSplitMessages());
            } else {
                arrayList.add(t2);
            }
        }
        MessageGlobleTrace.clear(t);
        abstractContext.setSplitMessages(arrayList);
        abstractContext.openSplitModel();
        if (arrayList != null && arrayList.size() != 0) {
            return true;
        }
        abstractContext.breakExecute();
        return false;
    }

    public boolean isAsynNode() {
        return false;
    }

    private JSONArray createJsonArray(List<T> list) {
        JSONArray jSONArray = new JSONArray();
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            jSONArray.add(it.next().getMessageBody());
        }
        return jSONArray;
    }

    protected AbstractStage chooseReplaceStage(AbstractStage abstractStage, AbstractStage... abstractStageArr) {
        if (abstractStageArr == null) {
            return abstractStage;
        }
        for (AbstractStage abstractStage2 : abstractStageArr) {
            if (abstractStage2 != null && abstractStage2.getName().equals(abstractStage.getName())) {
                return abstractStage2;
            }
        }
        return abstractStage;
    }

    private boolean doMessage(T t, AbstractStage abstractStage, AbstractContext abstractContext) {
        return abstractStage.doMessage((AbstractStage) t, abstractContext) != null && abstractContext.isContinue();
    }

    public void addStage(AbstractStage abstractStage) {
        this.stages.add(abstractStage);
    }

    public void setStageLable(AbstractStage abstractStage, String str) {
        abstractStage.setLabel(str);
    }

    public void setStages(List<AbstractStage> list) {
        this.stages = list;
    }

    @Override // org.apache.rocketmq.streams.common.configurable.AbstractConfigurable, org.apache.rocketmq.streams.common.configurable.IConfigurable
    public void destroy() {
        if (LOG.isInfoEnabled()) {
            LOG.info(getName() + " is destroy, release pipline " + this.stages.size());
        }
        this.stages.clear();
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public List<AbstractStage> getStages() {
        return this.stages;
    }

    public String getMsgSourceName() {
        return this.msgSourceName;
    }

    public String getSourceIdentification() {
        return this.sourceIdentification;
    }

    public void setSourceIdentification(String str) {
        this.sourceIdentification = str;
    }

    public Map<String, Map<String, PreFingerprint>> getPreFingerprintExecutor() {
        return this.preFingerprintExecutor;
    }

    public void setMsgSourceName(String str) {
        this.msgSourceName = str;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.rocketmq.streams.common.interfaces.IBaseStreamOperator
    public /* bridge */ /* synthetic */ Object doMessage(IMessage iMessage, AbstractContext abstractContext) {
        return doMessage((Pipeline<T>) iMessage, abstractContext);
    }
}
