/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
import org.apache.flink.streaming.util.CollectingSourceContext;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestStreamReadOperator {
    private static final Map<String, String> EXPECTED = new HashMap<String, String>();
    private Configuration conf;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        String basePath = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(basePath);
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
    }

    @Test
    void testWriteRecords() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            SteppingMailboxProcessor processor = this.createLocalMailbox(harness);
            StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(this.conf);
            List<MergeOnReadInputSplit> splits = this.generateSplits(func);
            MatcherAssert.assertThat((String)"Should have 4 splits", (Object)splits.size(), (Matcher)CoreMatchers.is((Object)4));
            for (MergeOnReadInputSplit mergeOnReadInputSplit : splits) {
                harness.processElement((Object)mergeOnReadInputSplit, -1L);
                MatcherAssert.assertThat((String)"Should process 1 split", (boolean)processor.runMailboxStep());
            }
            TestData.assertRowDataEquals((List<RowData>)harness.extractOutputValues(), TestData.DATA_SET_INSERT);
            TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
            List<MergeOnReadInputSplit> splits2 = this.generateSplits(func);
            MatcherAssert.assertThat((String)"Should have 4 splits", (Object)splits2.size(), (Matcher)CoreMatchers.is((Object)4));
            for (MergeOnReadInputSplit split : splits2) {
                harness.processElement((Object)split, -1L);
                MatcherAssert.assertThat((String)"Should processed 1 split", (boolean)processor.runMailboxStep());
            }
            ArrayList<RowData> arrayList = new ArrayList<RowData>(TestData.DATA_SET_INSERT);
            arrayList.addAll(TestData.DATA_SET_UPDATE_INSERT);
            TestData.assertRowDataEquals((List<RowData>)harness.extractOutputValues(), arrayList);
        }
    }

    @Test
    public void testCheckpoint() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        long timestamp = 0L;
        try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            SteppingMailboxProcessor processor = this.createLocalMailbox(harness);
            StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(this.conf);
            List<MergeOnReadInputSplit> splits = this.generateSplits(func);
            MatcherAssert.assertThat((String)"Should have 4 splits", (Object)splits.size(), (Matcher)CoreMatchers.is((Object)4));
            for (MergeOnReadInputSplit split : splits) {
                harness.processElement((Object)split, ++timestamp);
            }
            processor.getMainMailboxExecutor().execute(() -> harness.snapshot(1L, 3L), "Trigger snapshot");
            Assertions.assertTrue((boolean)processor.runMailboxStep(), (String)"Should have processed the split0");
            Assertions.assertTrue((boolean)processor.runMailboxStep(), (String)"Should have processed the snapshot state action");
            MatcherAssert.assertThat((Object)TestData.rowDataToString(harness.extractOutputValues()), (Matcher)CoreMatchers.is((Object)TestStreamReadOperator.getSplitExpected(Collections.singletonList(splits.get(0)), EXPECTED)));
            Assertions.assertTrue((boolean)processor.runMailboxStep(), (String)"Should have processed the split1");
            Assertions.assertTrue((boolean)processor.runMailboxStep(), (String)"Should have processed the split2");
            Assertions.assertTrue((boolean)processor.runMailboxStep(), (String)"Should have processed the split3");
            TestData.assertRowDataEquals((List<RowData>)harness.extractOutputValues(), TestData.DATA_SET_INSERT);
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState state;
        List<MergeOnReadInputSplit> splits;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(this.conf);
            splits = this.generateSplits(func);
            MatcherAssert.assertThat((String)"Should have 4 splits", (Object)splits.size(), (Matcher)CoreMatchers.is((Object)4));
            for (MergeOnReadInputSplit split : splits) {
                harness.processElement((Object)split, -1L);
            }
            SteppingMailboxProcessor localMailbox = this.createLocalMailbox(harness);
            for (int i = 0; i < 2; ++i) {
                Assertions.assertTrue((boolean)localMailbox.runMailboxStep(), (String)("Should have processed the split#" + i));
            }
            MatcherAssert.assertThat((Object)TestData.rowDataToString(harness.extractOutputValues()), (Matcher)CoreMatchers.is((Object)TestStreamReadOperator.getSplitExpected(splits.subList(0, 2), EXPECTED)));
            state = harness.snapshot(1L, 1L);
        }
        harness = this.createReader();
        var4_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            SteppingMailboxProcessor localMailbox = this.createLocalMailbox(harness);
            for (int i = 2; i < 4; ++i) {
                Assertions.assertTrue((boolean)localMailbox.runMailboxStep(), (String)("Should have processed one split#" + i));
            }
            MatcherAssert.assertThat((Object)TestData.rowDataToString(harness.extractOutputValues()), (Matcher)CoreMatchers.is((Object)TestStreamReadOperator.getSplitExpected(splits.subList(2, 4), EXPECTED)));
        }
        catch (Throwable throwable) {
            var4_2 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var4_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var4_2.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
    }

    private static String getSplitExpected(List<MergeOnReadInputSplit> splits, Map<String, String> expected) {
        return splits.stream().map(TestUtils::getSplitPartitionPath).map(expected::get).sorted(Comparator.naturalOrder()).collect(Collectors.toList()).toString();
    }

    private List<MergeOnReadInputSplit> generateSplits(StreamReadMonitoringFunction func) throws Exception {
        ArrayList<MergeOnReadInputSplit> splits = new ArrayList<MergeOnReadInputSplit>();
        func.open(this.conf);
        func.monitorDirAndForwardSplits((SourceFunction.SourceContext)new CollectingSourceContext(new Object(), splits));
        return splits;
    }

    private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader() throws Exception {
        Schema tableAvroSchema;
        String basePath = this.tempFile.getAbsolutePath();
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf((Configuration)new Configuration());
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
        List<String> partitionKeys = Collections.singletonList("partition");
        TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
        try {
            tableAvroSchema = schemaResolver.getTableAvroSchema();
        }
        catch (Exception e) {
            throw new HoodieException("Get table avro schema error", (Throwable)e);
        }
        DataType rowDataType = AvroSchemaConverter.convertToDataType((Schema)tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(rowType, TestConfigurations.ROW_TYPE, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema((LogicalType)TestConfigurations.ROW_TYPE).toString(), Collections.emptyList(), new String[0]);
        MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder().config(this.conf).tableState(hoodieTableState).fieldTypes(rowDataType.getChildren()).defaultPartName((String)FlinkOptions.PARTITION_DEFAULT_NAME.defaultValue()).limit(1000L).emitDelete(true).build();
        OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat)inputFormat);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(factory, 1, 1, 0);
        harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return harness;
    }

    private SteppingMailboxProcessor createLocalMailbox(OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness) {
        return new SteppingMailboxProcessor(MailboxDefaultAction.Controller::suspendDefaultAction, harness.getTaskMailbox(), StreamTaskActionExecutor.IMMEDIATE);
    }

    static {
        EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]");
        EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]");
        EXPECTED.put("par3", "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3]");
        EXPECTED.put("par4", "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]");
    }
}

