/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.source.ContinuousFileSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestDataStreamWrite
extends TestLogger {
    private static final Map<String, List<String>> EXPECTED = new HashMap<String, List<String>>();
    private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<String, List<String>>();
    private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER = new HashMap<String, List<String>>();
    @TempDir
    File tempFile;

    @ParameterizedTest
    @ValueSource(strings={"BUCKET", "FLINK_STATE"})
    public void testWriteCopyOnWrite(String indexType) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        conf.setString(FlinkOptions.INDEX_TYPE, indexType);
        conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
        conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
        conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        this.testWriteToHoodie(conf, "cow_write", 2, EXPECTED);
    }

    @Test
    public void testWriteCopyOnWriteWithTransformer() throws Exception {
        Transformer transformer = ds -> ds.map((MapFunction & Serializable)rowdata -> {
            if (rowdata instanceof GenericRowData) {
                GenericRowData genericRD = (GenericRowData)rowdata;
                genericRD.setField(2, (Object)(genericRD.getInt(2) + 1));
                return genericRD;
            }
            throw new RuntimeException("Unrecognized row type information: " + rowdata.getClass().getSimpleName());
        });
        this.testWriteToHoodie(transformer, "cow_write_with_transformer", EXPECTED_TRANSFORMER);
    }

    @Test
    public void testWriteCopyOnWriteWithChainedTransformer() throws Exception {
        Transformer t1 = ds -> ds.map((MapFunction & Serializable)rowData -> {
            if (rowData instanceof GenericRowData) {
                GenericRowData genericRD = (GenericRowData)rowData;
                genericRD.setField(2, (Object)(genericRD.getInt(2) + 1));
                return genericRD;
            }
            throw new RuntimeException("Unrecognized row type : " + rowData.getClass().getSimpleName());
        });
        ChainedTransformer chainedTransformer = new ChainedTransformer(Arrays.asList(t1, t1));
        this.testWriteToHoodie((Transformer)chainedTransformer, "cow_write_with_chained_transformer", EXPECTED_CHAINED_TRANSFORMER);
    }

    @ParameterizedTest
    @ValueSource(strings={"BUCKET", "FLINK_STATE"})
    public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        conf.setString(FlinkOptions.INDEX_TYPE, indexType);
        conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
        conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        this.testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
    }

    @Test
    public void testWriteCopyOnWriteWithClustering() throws Exception {
        this.testWriteCopyOnWriteWithClustering(false);
    }

    @Test
    public void testWriteCopyOnWriteWithSortClustering() throws Exception {
        this.testWriteCopyOnWriteWithClustering(true);
    }

    private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        conf.setString(FlinkOptions.OPERATION, "insert");
        if (sortClusteringEnabled) {
            conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
        }
        this.testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
    }

    private void testWriteToHoodie(Transformer transformer, String jobName, Map<String, List<String>> expected) throws Exception {
        this.testWriteToHoodie(TestConfigurations.getDefaultConf(this.tempFile.toURI().toString()), (Option<Transformer>)Option.of((Object)transformer), jobName, 2, expected);
    }

    private void testWriteToHoodie(Configuration conf, String jobName, int checkpoints, Map<String, List<String>> expected) throws Exception {
        this.testWriteToHoodie(conf, (Option<Transformer>)Option.empty(), jobName, checkpoints, expected);
    }

    private void testWriteToHoodie(Configuration conf, Option<Transformer> transformer, String jobName, int checkpoints, Map<String, List<String>> expected) throws Exception {
        SingleOutputStreamOperator dataStream;
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        execEnv.getConfig().disableObjectReuse();
        execEnv.setParallelism(4);
        execEnv.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        RowType rowType = (RowType)AvroSchemaConverter.convertToDataType((Schema)StreamerUtil.getSourceSchema((Configuration)conf)).getLogicalType();
        String sourcePath = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data")).toString();
        boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
        if (isMor) {
            TextInputFormat format = new TextInputFormat(new Path(sourcePath));
            format.setFilesFilter(FilePathFilter.createDefaultFilter());
            BasicTypeInfo typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
            format.setCharsetName("UTF-8");
            dataStream = execEnv.readFile((FileInputFormat)format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000L, (TypeInformation)typeInfo).map((MapFunction)JsonDeserializationFunction.getInstance((RowType)rowType)).setParallelism(1);
        } else {
            dataStream = execEnv.addSource((SourceFunction)new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)).name("continuous_file_source").setParallelism(1).map((MapFunction)JsonDeserializationFunction.getInstance((RowType)rowType)).setParallelism(4);
        }
        if (transformer.isPresent()) {
            dataStream = ((Transformer)transformer.get()).apply((DataStream)dataStream);
        }
        OptionsInference.setupSinkTasks((Configuration)conf, (int)execEnv.getParallelism());
        DataStream hoodieRecordDataStream = Pipelines.bootstrap((Configuration)conf, (RowType)rowType, (DataStream)dataStream);
        DataStream pipeline = Pipelines.hoodieStreamWrite((Configuration)conf, (DataStream)hoodieRecordDataStream);
        execEnv.addOperator(pipeline.getTransformation());
        if (isMor) {
            Pipelines.compact((Configuration)conf, (DataStream)pipeline);
        }
        this.execute(execEnv, isMor, jobName);
        TestData.checkWrittenDataCOW(this.tempFile, expected);
    }

    private void testWriteToHoodieWithCluster(Configuration conf, String jobName, int checkpoints, Map<String, List<String>> expected) throws Exception {
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        execEnv.getConfig().disableObjectReuse();
        execEnv.setParallelism(4);
        execEnv.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        RowType rowType = (RowType)AvroSchemaConverter.convertToDataType((Schema)StreamerUtil.getSourceSchema((Configuration)conf)).getLogicalType();
        String sourcePath = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data")).toString();
        SingleOutputStreamOperator dataStream = execEnv.addSource((SourceFunction)new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)).name("continuous_file_source").setParallelism(1).map((MapFunction)JsonDeserializationFunction.getInstance((RowType)rowType)).setParallelism(4);
        OptionsInference.setupSinkTasks((Configuration)conf, (int)execEnv.getParallelism());
        DataStream pipeline = Pipelines.append((Configuration)conf, (RowType)rowType, (DataStream)dataStream, (boolean)true);
        execEnv.addOperator(pipeline.getTransformation());
        Pipelines.cluster((Configuration)conf, (RowType)rowType, (DataStream)pipeline);
        this.execute(execEnv, false, jobName);
        TestData.checkWrittenDataCOW(this.tempFile, expected);
    }

    public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
        if (isMor) {
            JobClient client = execEnv.executeAsync(jobName);
            if (client.getJobStatus().get() != JobStatus.FAILED) {
                try {
                    TimeUnit.SECONDS.sleep(20L);
                    client.cancel();
                }
                catch (Throwable throwable) {}
            }
        } else {
            execEnv.execute(jobName);
        }
    }

    @Test
    public void testHoodiePipelineBuilderSource() throws Exception {
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        execEnv.getConfig().disableObjectReuse();
        execEnv.setParallelism(1);
        execEnv.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
        TestData.writeData(TestData.dataSetInsert(1, 2), conf);
        TestData.writeData(TestData.dataSetInsert(3, 4), conf);
        TestData.writeData(TestData.dataSetInsert(5, 6), conf);
        String latestCommit = TestUtils.getLastCompleteInstant(this.tempFile.toURI().toString());
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.toURI().toString());
        options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
        HoodiePipeline.Builder builder = HoodiePipeline.builder((String)"test_source").column("uuid string not null").column("name string").column("age int").column("`ts` timestamp(3)").column("`partition` string").pk(new String[]{"uuid"}).partition(new String[]{"partition"}).options(options);
        DataStream rowDataDataStream = builder.source(execEnv);
        ArrayList<RowData> result = new ArrayList<RowData>();
        rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
        TimeUnit.SECONDS.sleep(2L);
        TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
    }

    @Test
    public void testHoodiePipelineBuilderSink() throws Exception {
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        HashMap<String, String> options = new HashMap<String, String>();
        execEnv.getConfig().disableObjectReuse();
        execEnv.setParallelism(4);
        execEnv.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        options.put(FlinkOptions.PATH.key(), this.tempFile.toURI().toString());
        options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
        Configuration conf = Configuration.fromMap(options);
        String sourcePath = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data")).toString();
        TextInputFormat format = new TextInputFormat(new Path(sourcePath));
        format.setFilesFilter(FilePathFilter.createDefaultFilter());
        format.setCharsetName("UTF-8");
        SingleOutputStreamOperator dataStream = execEnv.addSource((SourceFunction)new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)).name("continuous_file_source").setParallelism(1).map((MapFunction)JsonDeserializationFunction.getInstance((Configuration)conf)).setParallelism(4);
        HoodiePipeline.Builder builder = HoodiePipeline.builder((String)"test_sink").column("uuid string not null").column("name string").column("age int").column("`ts` timestamp(3)").column("`partition` string").pk(new String[]{"uuid"}).partition(new String[]{"partition"}).options(options);
        builder.sink((DataStream)dataStream, false);
        this.execute(execEnv, false, "Api_Sink_Test");
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED);
    }

    static {
        EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
        EXPECTED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
        EXPECTED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", "id6,par3,id6,Emma,21,6000,par3"));
        EXPECTED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,45,7000,par4", "id8,par4,id8,Han,57,8000,par4"));
        EXPECTED_CHAINED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,25,1000,par1", "id2,par1,id2,Stephen,35,2000,par1"));
        EXPECTED_CHAINED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,55,3000,par2", "id4,par2,id4,Fabian,33,4000,par2"));
        EXPECTED_CHAINED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,20,5000,par3", "id6,par3,id6,Emma,22,6000,par3"));
        EXPECTED_CHAINED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,46,7000,par4", "id8,par4,id8,Han,58,8000,par4"));
    }
}

