/*
 * Decompiled with CFR 0.152.
 */
package net.ibizsys.dataflow.flink.eai;

import net.ibizsys.dataflow.core.eai.PSSysDataSyncAgentEngineBase;
import net.ibizsys.dataflow.flink.eai.IFlinkPSSysDataSyncAgentEngine;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.engine.IPSModelEngine;
import net.ibizsys.model.engine.IPSModelEngineHolder;
import net.ibizsys.model.engine.PSModelEngineException;
import net.ibizsys.model.res.IPSSysDataSyncAgent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkPSSysDataSyncAgentEngine
extends PSSysDataSyncAgentEngineBase<IPSSysDataSyncAgent>
implements IFlinkPSSysDataSyncAgentEngine<IPSSysDataSyncAgent> {
    private Source<?, ?, ?> source = null;
    private Sink<?> sink = null;

    public void init(IPSModelEngineHolder iPSModelEngineContext, String id, IPSSysDataSyncAgent iPSModelObject) throws Exception {
        super.init(iPSModelEngineContext, id, iPSModelObject);
        if ((PSModelEnums.DataSyncAgentDir.IN.value.equalsIgnoreCase(this.getPSSysDataSyncAgent().getSyncDir()) || PSModelEnums.DataSyncAgentDir.INOUT.value.equalsIgnoreCase(this.getPSSysDataSyncAgent().getSyncDir())) && this.getSource(true) == null) {
            this.prepareSource();
            if (this.getSource(true) == null) {
                throw new Exception("\u672a\u63d0\u4f9bSource\u5bf9\u8c61");
            }
        }
        if ((PSModelEnums.DataSyncAgentDir.OUT.value.equalsIgnoreCase(this.getPSSysDataSyncAgent().getSyncDir()) || PSModelEnums.DataSyncAgentDir.INOUT.value.equalsIgnoreCase(this.getPSSysDataSyncAgent().getSyncDir())) && this.getSink(true) == null) {
            this.prepareSink();
            if (this.getSink(true) == null) {
                throw new Exception(String.format("\u672a\u63d0\u4f9bSink\u5bf9\u8c61", new Object[0]));
            }
        }
    }

    protected void onInit() throws Exception {
        super.onInit();
    }

    @Override
    public Source<?, ?, ?> getSource() {
        return this.getSource(false);
    }

    public Source<?, ?, ?> getSource(boolean tryMode) {
        if (this.source != null || tryMode) {
            return this.source;
        }
        throw new PSModelEngineException((IPSModelEngine)this, String.format("\u672a\u63d0\u4f9bSource\u5bf9\u8c61", new Object[0]));
    }

    protected void setSource(Source<?, ?, ?> source) {
        this.source = source;
    }

    protected void prepareSource() throws Exception {
        if (PSModelEnums.DataSyncAgentType.KAFKA.value.equalsIgnoreCase(this.getAgentType())) {
            this.setSource((Source<?, ?, ?>)KafkaSource.builder().setBootstrapServers(this.getServiceUrl()).setTopics(new String[]{this.getDefaultTopic()}).setGroupId(this.getGroupId()).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer((DeserializationSchema)new SimpleStringSchema()).build());
            return;
        }
    }

    @Override
    public Sink<?> getSink() {
        return this.getSink(false);
    }

    public Sink<?> getSink(boolean tryMode) {
        if (this.sink != null || tryMode) {
            return this.sink;
        }
        throw new PSModelEngineException((IPSModelEngine)this, String.format("\u672a\u63d0\u4f9bSink\u5bf9\u8c61", new Object[0]));
    }

    protected void setSink(Sink<?> sink) {
        this.sink = sink;
    }

    protected void prepareSink() throws Exception {
        if (PSModelEnums.DataSyncAgentType.KAFKA.value.equalsIgnoreCase(this.getAgentType())) {
            return;
        }
    }

    @Override
    public boolean isEnableSource() {
        return this.getSource(true) != null;
    }

    @Override
    public boolean isEnableSink() {
        return this.getSink(true) != null;
    }
}

