package org.apache.hudi.examples.quickstart;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory;
import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.class */
public final class HoodieFlinkQuickstart {
    private EnvironmentSettings settings = null;
    private TableEnvironment streamTableEnv = null;
    private String tableName;

    private HoodieFlinkQuickstart() {
    }

    public static HoodieFlinkQuickstart instance() {
        return new HoodieFlinkQuickstart();
    }

    public static void main(String[] strArr) throws TableNotExistException, InterruptedException {
        if (strArr.length < 3) {
            System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName> <tableType>");
            System.exit(1);
        }
        String str = strArr[0];
        String str2 = strArr[1];
        String str3 = strArr[2];
        HoodieFlinkQuickstart instance = instance();
        instance.initEnv();
        instance.createFileSource();
        instance.createHudiTable(str, str2, HoodieTableType.valueOf(str3));
        instance.insertData();
        instance.queryData();
        instance.updateData();
    }

    public void initEnv() {
        if (this.streamTableEnv == null) {
            this.settings = EnvironmentSettings.newInstance().build();
            TableEnvironmentImpl create = TableEnvironmentImpl.create(this.settings);
            create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
            Configuration configuration = create.getConfig().getConfiguration();
            configuration.setString("execution.checkpointing.interval", "2s");
            configuration.setString("restart-strategy", "fixed-delay");
            configuration.setString("restart-strategy.fixed-delay.attempts", "0");
            this.streamTableEnv = create;
        }
    }

    public TableEnvironment getStreamTableEnv() {
        return this.streamTableEnv;
    }

    public TableEnvironment getBatchTableEnv() {
        Configuration configuration = new Configuration();
        configuration.setBoolean("execution.sorted-inputs.enabled", false);
        configuration.setBoolean("execution.batch-state-backend.enabled", false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        this.settings = EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, this.settings);
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        return create;
    }

    public void createHudiTable(String str, String str2, HoodieTableType hoodieTableType) {
        this.tableName = str2;
        this.streamTableEnv.executeSql(QuickstartConfigurations.sql(str2).option(FlinkOptions.PATH, str).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
    }

    public void createFileSource() {
        this.streamTableEnv.executeSql(QuickstartConfigurations.getFileSourceDDL("source"));
    }

    @NotNull
    List<Row> insertData() throws InterruptedException, TableNotExistException {
        execInsertSql(this.streamTableEnv, String.format("insert into %s select * from source", this.tableName));
        return queryData();
    }

    List<Row> queryData() throws InterruptedException, TableNotExistException {
        return execSelectSql(this.streamTableEnv, String.format("select * from %s", this.tableName), 10L);
    }

    @NotNull
    List<Row> updateData() throws InterruptedException, TableNotExistException {
        execInsertSql(getStreamTableEnv(), String.format("insert into %s select * from source", this.tableName));
        return queryData();
    }

    public static void execInsertSql(TableEnvironment tableEnvironment, String str) {
        try {
            ((JobClient) tableEnvironment.executeSql(str).getJobClient().get()).getJobExecutionResult().get();
        } catch (InterruptedException | ExecutionException e) {
        }
    }

    public static List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, long j) throws InterruptedException, TableNotExistException {
        return execSelectSql(tableEnvironment, str, j, (String) null);
    }

    public static List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, long j, String str2) throws InterruptedException, TableNotExistException {
        String collectSinkDDL;
        if (str2 != null) {
            collectSinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink", ((Catalog) tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog()).get()).getTable(new ObjectPath(tableEnvironment.getCurrentDatabase(), str2)).getSchema());
        } else {
            collectSinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink");
        }
        return execSelectSql(tableEnvironment, str, collectSinkDDL, j);
    }

    public static List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, String str2, long j) throws InterruptedException {
        tableEnvironment.executeSql("DROP TABLE IF EXISTS sink");
        tableEnvironment.executeSql(str2);
        TableResult executeSql = tableEnvironment.executeSql("insert into sink " + str);
        TimeUnit.SECONDS.sleep(j);
        executeSql.getJobClient().ifPresent((v0) -> {
            v0.cancel();
        });
        tableEnvironment.executeSql("DROP TABLE IF EXISTS sink");
        return (List) CollectSinkTableFactory.RESULT.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }
}
