package org.apache.rocketmq.streams.client.transform;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.streams.client.strategy.LogFingerprintStrategy;
import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.client.transform.JoinStream;
import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
import org.apache.rocketmq.streams.common.channel.impl.OutputPrintChannel;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSink;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.functions.FilterFunction;
import org.apache.rocketmq.streams.common.functions.FlatMapFunction;
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
import org.apache.rocketmq.streams.common.functions.ForEachMessageFunction;
import org.apache.rocketmq.streams.common.functions.MapFunction;
import org.apache.rocketmq.streams.common.functions.SplitFunction;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.topology.model.Union;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
import org.apache.rocketmq.streams.common.topology.stages.udf.StageBuilder;
import org.apache.rocketmq.streams.common.topology.stages.udf.UDFChainStage;
import org.apache.rocketmq.streams.common.topology.stages.udf.UDFUnionChainStage;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
import org.apache.rocketmq.streams.db.sink.DBSink;
import org.apache.rocketmq.streams.db.sink.DynamicMultipleDBSink;
import org.apache.rocketmq.streams.db.sink.EnhanceDBSink;
import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.dim.model.DBDim;
import org.apache.rocketmq.streams.dim.model.FileDim;
import org.apache.rocketmq.streams.examples.aggregate.Constant;
import org.apache.rocketmq.streams.filter.operator.FilterOperator;
import org.apache.rocketmq.streams.mqtt.sink.PahoSink;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;
import org.apache.rocketmq.streams.sink.RocketMQSink;
import org.apache.rocketmq.streams.window.builder.WindowBuilder;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.operator.impl.OverWindow;
import org.apache.rocketmq.streams.window.operator.impl.ShuffleOverWindow;
import org.apache.rocketmq.streams.window.operator.join.JoinWindow;

/* loaded from: input_file:org/apache/rocketmq/streams/client/transform/DataStream.class */
public class DataStream implements Serializable {
    protected PipelineBuilder mainPipelineBuilder;
    protected Set<PipelineBuilder> otherPipelineBuilders;
    protected ChainStage<?> currentChainStage;

    public DataStream(String str, String str2) {
        this.mainPipelineBuilder = new PipelineBuilder(str, str2);
        this.otherPipelineBuilders = Sets.newHashSet();
    }

    public DataStream(PipelineBuilder pipelineBuilder, ChainStage<?> chainStage) {
        this.mainPipelineBuilder = pipelineBuilder;
        this.otherPipelineBuilders = Sets.newHashSet();
        this.currentChainStage = chainStage;
    }

    public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> set, ChainStage<?> chainStage) {
        this.mainPipelineBuilder = pipelineBuilder;
        this.otherPipelineBuilders = set;
        this.currentChainStage = chainStage;
    }

    public DataStream with(Strategy... strategyArr) {
        Properties properties = new Properties();
        for (Strategy strategy : strategyArr) {
            if (strategy instanceof LogFingerprintStrategy) {
                AbstractSource source = this.mainPipelineBuilder.getPipeline().getSource();
                if (source instanceof AbstractSource) {
                    AbstractSource abstractSource = source;
                    String[] logFingerprintFields = ((LogFingerprintStrategy) strategy).getLogFingerprintFields();
                    if (logFingerprintFields != null) {
                        ArrayList arrayList = new ArrayList();
                        Collections.addAll(arrayList, logFingerprintFields);
                        abstractSource.setLogFingerprintFields(arrayList);
                    }
                }
            }
            properties.putAll(strategy.getStrategyProperties());
        }
        ComponentCreator.createProperties(properties);
        return this;
    }

    public DataStream script(String str) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new ScriptOperator(str));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream filterByExpression(String str, String... strArr) {
        return filterByExpression(str, false, strArr);
    }

    public DataStream filterByExpression(String str, boolean z, String... strArr) {
        FilterChainStage createStage = this.mainPipelineBuilder.createStage(new FilterOperator(str));
        if (strArr != null && strArr.length > 0) {
            createStage.setFilterFieldNames(MapKeyUtil.createKeyBySign(",", strArr));
        }
        createStage.setOpenHyperscan(z);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public <T, O> DataStream map(final MapFunction<T, O> mapFunction) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.1
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                try {
                    Object map = mapFunction.map(iMessage.getMessageValue());
                    if (map == iMessage.getMessageValue()) {
                        return null;
                    }
                    if (map instanceof JSONObject) {
                        iMessage.setMessageBody((JSONObject) map);
                        return null;
                    }
                    iMessage.setMessageBody(new UserDefinedMessage(map));
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public <T, O> DataStream flatMap(final FlatMapFunction<T, O> flatMapFunction) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.2
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                try {
                    List flatMap = flatMapFunction.flatMap(iMessage.getMessageValue());
                    if (flatMap == null || flatMap.size() == 0) {
                        abstractContext.breakExecute();
                    } else {
                        ArrayList arrayList = new ArrayList();
                        for (Object obj : flatMap) {
                            arrayList.add(obj instanceof JSONObject ? new Message((JSONObject) obj) : new Message(new UserDefinedMessage(obj)));
                        }
                        abstractContext.openSplitModel();
                        abstractContext.setSplitMessages(arrayList);
                    }
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public <O> DataStream filter(FilterFunction<O> filterFunction) {
        return filter(filterFunction, null);
    }

    public <O> DataStream filter(final FilterFunction<O> filterFunction, String... strArr) {
        UDFChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.3
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                try {
                    if (!filterFunction.filter(iMessage.getMessageValue())) {
                        abstractContext.put("NEED_USE_FINGER_PRINT", true);
                        abstractContext.breakExecute();
                    }
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
        if (strArr != null && strArr.length > 0) {
            createStage.setFilterFieldNames(MapKeyUtil.createKeyBySign(",", strArr));
        }
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public WindowStream window(WindowInfo windowInfo) {
        AbstractWindow createWindow = windowInfo.createWindow();
        ChainStage createStage = this.mainPipelineBuilder.createStage(createWindow);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new WindowStream(createWindow, this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream distinct(String... strArr) {
        return distinct(-1, strArr);
    }

    public DataStream distinct(int i, String... strArr) {
        OverWindow overWindow = new OverWindow();
        overWindow.setReservedOne(true);
        if (i == -1) {
            i = 3600;
        }
        overWindow.setSizeInterval(i);
        overWindow.setSlideInterval(i);
        overWindow.setTimeUnitAdjust(1);
        overWindow.setGroupByFieldName(MapKeyUtil.createKeyBySign(";", strArr));
        for (String str : strArr) {
            overWindow.getSelectMap().put(str, str);
        }
        ChainStage createStage = this.mainPipelineBuilder.createStage(overWindow);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public OverWindowStream topN(String str, int i, String... strArr) {
        ShuffleOverWindow shuffleOverWindow = new ShuffleOverWindow();
        shuffleOverWindow.setTopN(i);
        shuffleOverWindow.setRowNumerName(str);
        ChainStage createStage = this.mainPipelineBuilder.createStage(shuffleOverWindow);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        OverWindowStream overWindowStream = new OverWindowStream(shuffleOverWindow, this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
        overWindowStream.groupBy(strArr);
        return overWindowStream;
    }

    public DataStream addStage(IStageBuilder iStageBuilder) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(iStageBuilder);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public JoinStream join(DataStream dataStream) {
        return join(dataStream, JoinStream.JoinType.INNER_JOIN);
    }

    public JoinStream leftJoin(DataStream dataStream) {
        return join(dataStream, JoinStream.JoinType.LEFT_JOIN);
    }

    protected JoinStream join(DataStream dataStream, JoinStream.JoinType joinType) {
        JoinWindow createDefaultJoinWindow = WindowBuilder.createDefaultJoinWindow();
        ChainStage<?> createStage = this.mainPipelineBuilder.createStage(new ScriptOperator("setHeader(msgRouteFromLable,'left')"));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        this.currentChainStage = createStage;
        ChainStage createStage2 = this.mainPipelineBuilder.createStage(createDefaultJoinWindow);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage2);
        dataStream.script("setHeader(msgRouteFromLable,'right')").addStage(createDefaultJoinWindow);
        addOtherDataStream(dataStream);
        return new JoinStream(createDefaultJoinWindow, this.mainPipelineBuilder, this.otherPipelineBuilders, (ChainStage<?>) createStage2, joinType);
    }

    public DataStream union(DataStream dataStream) {
        Union union = new Union();
        UDFUnionChainStage createStage = this.mainPipelineBuilder.createStage(union);
        createStage.setMainStream(true);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        dataStream.addStage(union);
        addOtherDataStream(dataStream);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public SplitStream split(final SplitFunction splitFunction) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.4
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                iMessage.getHeader().addRouteLable(new String[]{splitFunction.split(iMessage.getMessageValue())});
                return null;
            }
        });
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new SplitStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    @Deprecated
    public JoinStream join(String str, String str2, String str3, String str4, long j) {
        return join(str, str2, str3, str4, null, j);
    }

    @Deprecated
    public JoinStream join(String str, String str2, String str3, String str4, String str5, long j) {
        DBDim dBDim = new DBDim();
        dBDim.setUrl(str);
        dBDim.setUserName(str2);
        dBDim.setPassword(str3);
        dBDim.setSql(str4);
        dBDim.setPollingTimeMinute(Long.valueOf(j));
        dBDim.setJdbcdriver(str5);
        this.mainPipelineBuilder.addConfigurables(dBDim);
        return new JoinStream((AbstractDim) dBDim, this.mainPipelineBuilder, this.otherPipelineBuilders, this.currentChainStage, (Boolean) true);
    }

    public JoinStream dimJoin(String str, String str2, String str3, String str4, Long l) {
        return dimJoin(str, str2, str3, str4, "com.mysql.jdbc.Driver", l);
    }

    public JoinStream dimJoin(String str, String str2, String str3, String str4, String str5, Long l) {
        return dimJoin(str, str2, str3, str4, str5, l, JoinStream.JoinType.INNER_JOIN);
    }

    public JoinStream dimJoin(String str, Long l) {
        return dimJoin(str, l, JoinStream.JoinType.INNER_JOIN);
    }

    public JoinStream dimLeftJoin(String str, String str2, String str3, String str4, Long l) {
        return dimLeftJoin(str, str2, str3, str4, "com.mysql.jdbc.Driver", l);
    }

    public JoinStream dimLeftJoin(String str, String str2, String str3, String str4, String str5, Long l) {
        return dimJoin(str, str2, str3, str4, str5, l, JoinStream.JoinType.LEFT_JOIN);
    }

    public JoinStream dimLeftJoin(String str, Long l) {
        return dimJoin(str, l, JoinStream.JoinType.LEFT_JOIN);
    }

    protected JoinStream dimJoin(String str, Long l, JoinStream.JoinType joinType) {
        FileDim fileDim = new FileDim();
        fileDim.setFilePath(str);
        fileDim.setPollingTimeMinute(l);
        this.mainPipelineBuilder.addConfigurables(fileDim);
        return new JoinStream(fileDim, this.mainPipelineBuilder, this.otherPipelineBuilders, this.currentChainStage, true, joinType);
    }

    protected JoinStream dimJoin(String str, String str2, String str3, String str4, String str5, Long l, JoinStream.JoinType joinType) {
        DBDim dBDim = new DBDim();
        dBDim.setUrl(str);
        dBDim.setUserName(str2);
        dBDim.setPassword(str3);
        dBDim.setSql(str4);
        dBDim.setPollingTimeMinute(l);
        dBDim.setJdbcdriver(str5);
        this.mainPipelineBuilder.addConfigurables(dBDim);
        return new JoinStream(dBDim, this.mainPipelineBuilder, this.otherPipelineBuilders, this.currentChainStage, true, joinType);
    }

    @Deprecated
    public JoinStream join(String str, long j) {
        FileDim fileDim = new FileDim();
        fileDim.setFilePath(str);
        fileDim.setPollingTimeMinute(Long.valueOf(j));
        this.mainPipelineBuilder.addConfigurables(fileDim);
        return new JoinStream((AbstractDim) fileDim, this.mainPipelineBuilder, this.otherPipelineBuilders, this.currentChainStage, (Boolean) true);
    }

    @Deprecated
    public JoinStream innerJoin(String str, long j) {
        FileDim fileDim = new FileDim();
        fileDim.setFilePath(str);
        fileDim.setPollingTimeMinute(Long.valueOf(j));
        this.mainPipelineBuilder.addConfigurables(fileDim);
        return new JoinStream((AbstractDim) fileDim, this.mainPipelineBuilder, this.otherPipelineBuilders, this.currentChainStage, (Boolean) true);
    }

    public <O> DataStream forEach(final ForEachFunction<O> forEachFunction) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.5
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                forEachFunction.foreach(iMessage.getMessageValue());
                return null;
            }
        });
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public <O> DataStream forEachMessage(final ForEachMessageFunction forEachMessageFunction) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new StageBuilder() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.6
            protected <T> T operate(IMessage iMessage, AbstractContext abstractContext) {
                forEachMessageFunction.foreach(iMessage, abstractContext);
                return null;
            }
        });
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream selectFields(String... strArr) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new ScriptOperator("retain(" + MapKeyUtil.createKeyBySign(",", strArr) + ")"));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    protected void addOtherDataStream(DataStream dataStream) {
        if (!dataStream.mainPipelineBuilder.equals(this.mainPipelineBuilder)) {
            this.otherPipelineBuilders.add(dataStream.mainPipelineBuilder);
        }
        this.otherPipelineBuilders.addAll(dataStream.otherPipelineBuilders);
    }

    public DataStream toFile(String str, int i, boolean z) {
        FileSink fileSink = new FileSink(str, z);
        if (i > 0) {
            fileSink.setBatchSize(i);
        }
        ChainStage createStage = this.mainPipelineBuilder.createStage(fileSink);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toFile(String str, boolean z) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new FileSink(str, z));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toFile(String str) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new FileSink(str));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toPrint() {
        return toPrint(-1);
    }

    public DataStream toPrint(int i) {
        OutputPrintChannel outputPrintChannel = new OutputPrintChannel();
        if (i > 0) {
            outputPrintChannel.setBatchSize(i);
        }
        ChainStage createStage = this.mainPipelineBuilder.createStage(outputPrintChannel);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toDB(String str, String str2, String str3, String str4) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new DBSink(str, str2, str3, str4));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toDB(String str, String str2, String str3, String str4, String str5) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new DBSink(str, str2, str3, str4));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toDB(String str, String str2, String str3, String str4, String str5, Boolean bool) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new DBSink(str, str2, str3, str4));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toRocketmq(String str, String str2) {
        return toRocketmq(str, Constant.TAGS, null, -1, str2, null, false);
    }

    public DataStream toRocketmq(String str, String str2, String str3) {
        return toRocketmq(str, Constant.TAGS, str2, -1, str3, null, false);
    }

    public DataStream toRocketmq(String str, String str2, String str3, String str4) {
        return toRocketmq(str, str2, str3, -1, str4, null, false);
    }

    public DataStream toRocketmq(String str, String str2, String str3, int i, String str4, String str5, boolean z) {
        RocketMQSink rocketMQSink = new RocketMQSink();
        if (StringUtils.isNotBlank(str)) {
            rocketMQSink.setTopic(str);
        }
        if (StringUtils.isNotBlank(str2)) {
            rocketMQSink.setTags(str2);
        }
        if (StringUtils.isNotBlank(str3)) {
            rocketMQSink.setGroupName(str3);
        }
        if (StringUtils.isNotBlank(str4)) {
            rocketMQSink.setNamesrvAddr(str4);
        }
        if (StringUtils.isNotBlank(str5)) {
            rocketMQSink.setClusterName(str5);
        }
        if (i > 0) {
            rocketMQSink.setBatchSize(i);
        }
        rocketMQSink.setOrder(z);
        ChainStage createStage = this.mainPipelineBuilder.createStage(rocketMQSink);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toEnhanceDBSink(String str, String str2, String str3, String str4) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new EnhanceDBSink(str, str2, str3, str4));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toMultiDB(String str, String str2, String str3, String str4, String str5) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new DynamicMultipleDBSink(str, str2, str3, str4, str5));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toMqtt(String str, String str2, String str3) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new PahoSink(str, str2, str3));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream toMqtt(String str, String str2, String str3, String str4, String str5) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(new PahoSink(str, str2, str3, str4, str5));
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public DataStream to(ISink<?> iSink) {
        ChainStage createStage = this.mainPipelineBuilder.createStage(iSink);
        this.mainPipelineBuilder.setTopologyStages(this.currentChainStage, createStage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, createStage);
    }

    public void start() {
        start(false);
    }

    public void asyncStart() {
        start(true);
    }

    public void start(boolean z) {
        if (this.mainPipelineBuilder == null) {
            return;
        }
        ConfigurableComponent component = ComponentCreator.getComponent(this.mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, new String[]{"dipper.configurable.service.type:memory"});
        final ChainPipeline build = this.mainPipelineBuilder.build(component.getService());
        if (this.otherPipelineBuilders != null) {
            new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.7
                @Override // java.lang.Runnable
                public void run() {
                    build.startChannel();
                }
            }).start();
            Iterator<PipelineBuilder> it = this.otherPipelineBuilders.iterator();
            while (it.hasNext()) {
                final ChainPipeline build2 = it.next().build(component.getService());
                new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.client.transform.DataStream.8
                    @Override // java.lang.Runnable
                    public void run() {
                        build2.startChannel();
                    }
                }).start();
            }
        } else {
            build.startChannel();
        }
        if (z) {
            return;
        }
        while (true) {
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
