/*
 * Decompiled with CFR 0.152.
 */
package net.ibizsys.dataflow.flink;

import java.util.List;
import net.ibizsys.dataflow.core.PSDataFlowSystemEngineBase;
import net.ibizsys.dataflow.flink.IFlinkPSDataFlowSystemEngine;
import net.ibizsys.dataflow.flink.ISourceProvider;
import net.ibizsys.dataflow.flink.dataentity.datasync.IFlinkPSDEDataSyncEngine;
import net.ibizsys.dataflow.flink.eai.IFlinkPSSysDataSyncAgentEngine;
import net.ibizsys.model.IPSModelObject;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.dataentity.IPSDataEntity;
import net.ibizsys.model.dataentity.datasync.IPSDEDataSync;
import net.ibizsys.model.res.IPSSysDataSyncAgent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.util.ObjectUtils;

public class FlinkPSDataFlowSystemEngine
extends PSDataFlowSystemEngineBase
implements IFlinkPSDataFlowSystemEngine {
    private StreamExecutionEnvironment streamExecutionEnvironment = null;

    protected void onInit() throws Exception {
        if (this.getStreamExecutionEnvironment(true) == null) {
            this.prepareStreamExecutionEnvironment();
        }
        super.onInit();
        List psSysDataSyncAgentList = this.getPSSystem().getAllPSSysDataSyncAgents();
        if (!ObjectUtils.isEmpty((Object)psSysDataSyncAgentList)) {
            for (IPSSysDataSyncAgent iPSSysDataSyncAgent : psSysDataSyncAgentList) {
                IFlinkPSSysDataSyncAgentEngine iPSSysDataSyncAgentEngine;
                if (!PSModelEnums.DataSyncAgentType.KAFKA.value.equalsIgnoreCase(iPSSysDataSyncAgent.getAgentType()) || !PSModelEnums.DataSyncAgentDir.IN.value.equalsIgnoreCase(iPSSysDataSyncAgent.getSyncDir()) || !((iPSSysDataSyncAgentEngine = (IFlinkPSSysDataSyncAgentEngine)this.getPSModelEngineHolder().getPSModelEngine((IPSModelObject)iPSSysDataSyncAgent, IFlinkPSSysDataSyncAgentEngine.class)) instanceof ISourceProvider)) continue;
                Source<?, ?, ?> source = iPSSysDataSyncAgentEngine.getSource();
                DataStreamSource s = this.getStreamExecutionEnvironment().fromSource(source, WatermarkStrategy.noWatermarks(), iPSSysDataSyncAgent.getName());
                boolean sinkFlag = false;
                List psDataEntityList = this.getPSSystem().getAllPSDataEntities();
                if (!ObjectUtils.isEmpty((Object)psDataEntityList)) {
                    for (IPSDataEntity iPSDataEntity : psDataEntityList) {
                        List psDEDataSyncList = iPSDataEntity.getAllPSDEDataSyncs();
                        if (ObjectUtils.isEmpty((Object)psDEDataSyncList)) continue;
                        for (IPSDEDataSync iPSDEDataSync : psDEDataSyncList) {
                            if (!PSModelEnums.DataSyncDir.IN.value.equalsIgnoreCase(iPSDEDataSync.getSyncDir()) || !iPSDEDataSync.getInPSSysDataSyncAgentMust().getId().equalsIgnoreCase(iPSSysDataSyncAgent.getId())) continue;
                            IFlinkPSDEDataSyncEngine iPSDEDataSyncEngine = (IFlinkPSDEDataSyncEngine)this.getPSModelEngineHolder().getPSModelEngine((IPSModelObject)iPSDEDataSync, IFlinkPSDEDataSyncEngine.class);
                            if (iPSDEDataSyncEngine.isEnableSinkFunction()) {
                                s.addSink(iPSDEDataSyncEngine.getSinkFunction());
                                sinkFlag = true;
                                continue;
                            }
                            throw new Exception(String.format("\u5b9e\u4f53\u6570\u636e\u540c\u6b65[%1$s]\u5f15\u64ce\u672a\u63d0\u4f9bSink\u529f\u80fd", iPSDEDataSync.getName()));
                        }
                    }
                }
                if (sinkFlag) continue;
                s.print();
            }
        }
    }

    protected void prepareStreamExecutionEnvironment() throws Exception {
        this.setStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
    }

    protected StreamExecutionEnvironment getStreamExecutionEnvironment() throws Exception {
        return this.getStreamExecutionEnvironment(false);
    }

    protected void setStreamExecutionEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.streamExecutionEnvironment = streamExecutionEnvironment;
    }

    protected StreamExecutionEnvironment getStreamExecutionEnvironment(boolean tryMode) throws Exception {
        if (this.streamExecutionEnvironment != null || tryMode) {
            return this.streamExecutionEnvironment;
        }
        throw new Exception(String.format("\u672a\u6307\u5b9a\u6d41\u6267\u884c\u73af\u5883", new Object[0]));
    }
}

