package org.apache.seatunnel.flink.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/flink/stream/FlinkStreamExecution.class */
public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FlinkStreamExecution.class);
    private Config config;
    private final FlinkEnvironment flinkEnvironment;

    public FlinkStreamExecution(FlinkEnvironment flinkEnvironment) {
        this.flinkEnvironment = flinkEnvironment;
    }

    @Override // org.apache.seatunnel.apis.base.env.Execution
    public void start(List<FlinkStreamSource> list, List<FlinkStreamTransform> list2, List<FlinkStreamSink> list3) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (FlinkStreamSource flinkStreamSource : list) {
            DataStream<Row> data = flinkStreamSource.getData(this.flinkEnvironment);
            arrayList.add(data);
            registerResultTable(flinkStreamSource, data);
        }
        DataStream<Row> dataStream = (DataStream) arrayList.get(0);
        for (FlinkStreamTransform flinkStreamTransform : list2) {
            dataStream = flinkStreamTransform.processStream(this.flinkEnvironment, fromSourceTable(flinkStreamTransform.getConfig()).orElse(dataStream));
            registerResultTable(flinkStreamTransform, dataStream);
            flinkStreamTransform.registerFunction(this.flinkEnvironment);
        }
        for (FlinkStreamSink flinkStreamSink : list3) {
            flinkStreamSink.outputStream(this.flinkEnvironment, fromSourceTable(flinkStreamSink.getConfig()).orElse(dataStream));
        }
        try {
            log.info("Flink Execution Plan:{}", this.flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
            this.flinkEnvironment.getStreamExecutionEnvironment().execute(this.flinkEnvironment.getJobName());
        } catch (Exception e) {
            log.warn("Flink with job name [{}] execute failed", this.flinkEnvironment.getJobName());
            throw e;
        }
    }

    private void registerResultTable(Plugin<FlinkEnvironment> plugin, DataStream<Row> dataStream) {
        this.flinkEnvironment.registerResultTable(plugin.getConfig(), dataStream);
    }

    private Optional<DataStream<Row>> fromSourceTable(Config config) {
        if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            return Optional.empty();
        }
        StreamTableEnvironment streamTableEnvironment = this.flinkEnvironment.getStreamTableEnvironment();
        return Optional.ofNullable(TableUtil.tableToDataStream(streamTableEnvironment, streamTableEnvironment.scan(new String[]{config.getString(Plugin.SOURCE_TABLE_NAME)}), true));
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }
}
