/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.spark3.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
import org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter;
import org.apache.hudi.spark3.internal.HoodieWriterCommitMessage;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieBulkInsertDataInternalWriter
extends HoodieBulkInsertInternalWriterTestBase {
    private static Stream<Arguments> configParams() {
        Object[][] data = new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> bulkInsertTypeParams() {
        Object[][] data = new Object[][]{{true}, {false}};
        return Stream.of(data).map(Arguments::of);
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(populateMetaFields);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        for (int i = 0; i < 2; ++i) {
            String instantTime = "00" + i;
            HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter((HoodieTable)table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, populateMetaFields, sorted);
            int size = 10 + RANDOM.nextInt(1000);
            int batches = 3;
            Dataset totalInputRows = null;
            for (int j = 0; j < batches; ++j) {
                String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
                Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
                this.writeRows((Dataset<Row>)inputRows, writer);
                totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
            }
            HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
            Option fileAbsPaths = Option.of(new ArrayList());
            Option fileNames = Option.of(new ArrayList());
            this.assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, (Option<List<String>>)fileAbsPaths, (Option<List<String>>)fileNames, false);
            Dataset result = this.sqlContext.read().parquet(((List)fileAbsPaths.get()).toArray(new String[0]));
            this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime, (Option<List<String>>)fileNames, populateMetaFields);
        }
    }

    @Test
    public void testGlobalFailure() throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(true);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
        String instantTime = "001";
        HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter((HoodieTable)table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true, false);
        int size = 10 + RANDOM.nextInt(100);
        int totalFailures = 5;
        Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)(size / 2), (String)partitionPath, (boolean)false);
        List internalRows = SparkDatasetTestUtils.toInternalRows((Dataset)inputRows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        for (int i = 0; i < totalFailures; ++i) {
            internalRows.add(SparkDatasetTestUtils.getInternalRowWithError((String)partitionPath));
        }
        Dataset inputRows2 = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)(size / 2), (String)partitionPath, (boolean)false);
        internalRows.addAll(SparkDatasetTestUtils.toInternalRows((Dataset)inputRows2, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER));
        try {
            for (InternalRow internalRow : internalRows) {
                writer.write(internalRow);
            }
            Assertions.fail((String)"Should have failed");
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
        Option fileAbsPaths = Option.of(new ArrayList());
        Option fileNames = Option.of(new ArrayList());
        this.assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, (Option<List<String>>)fileAbsPaths, (Option<List<String>>)fileNames);
        Dataset result = this.sqlContext.read().parquet(((List)fileAbsPaths.get()).toArray(new String[0]));
        this.assertOutput((Dataset<Row>)inputRows, (Dataset<Row>)result, instantTime, (Option<List<String>>)fileNames, true);
    }

    private void writeRows(Dataset<Row> inputRows, HoodieBulkInsertDataInternalWriter writer) throws Exception {
        List internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        for (InternalRow internalRow : internalRows) {
            writer.write(internalRow);
        }
    }
}

