package org.apache.hudi.spark3.internal;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.class */
public class TestHoodieBulkInsertDataInternalWriter extends HoodieBulkInsertInternalWriterTestBase {
    private static Stream<Arguments> configParams() {
        return Stream.of(new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false}).map(Arguments::of);
    }

    private static Stream<Arguments> bulkInsertTypeParams() {
        return Stream.of(new Object[]{true}, new Object[]{false}).map(Arguments::of);
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testDataInternalWriter(boolean z, boolean z2) throws Exception {
        HoodieWriteConfig writeConfig = getWriteConfig(z2);
        HoodieSparkTable create = HoodieSparkTable.create(writeConfig, this.context, this.metaClient);
        for (int i = 0; i < 2; i++) {
            String str = "00" + i;
            HoodieBulkInsertDataInternalWriter hoodieBulkInsertDataInternalWriter = new HoodieBulkInsertDataInternalWriter(create, writeConfig, str, RANDOM.nextInt(100000), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, z2, z);
            int nextInt = 10 + RANDOM.nextInt(1000);
            Dataset<Row> dataset = null;
            for (int i2 = 0; i2 < 3; i2++) {
                Dataset<Row> randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i2 % 3], false);
                writeRows(randomRows, hoodieBulkInsertDataInternalWriter);
                dataset = dataset == null ? randomRows : dataset.union(randomRows);
            }
            HoodieWriterCommitMessage commit = hoodieBulkInsertDataInternalWriter.commit();
            Option of = Option.of(new ArrayList());
            Option of2 = Option.of(new ArrayList());
            assertWriteStatuses(commit.getWriteStatuses(), 3, nextInt, z, of, of2);
            assertOutput(dataset, this.sqlContext.read().parquet((String[]) ((List) of.get()).toArray(new String[0])), str, of2, z2);
        }
    }

    @Test
    public void testGlobalFailure() throws Exception {
        HoodieWriteConfig writeConfig = getWriteConfig(true);
        HoodieSparkTable create = HoodieSparkTable.create(writeConfig, this.context, this.metaClient);
        String str = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
        HoodieBulkInsertDataInternalWriter hoodieBulkInsertDataInternalWriter = new HoodieBulkInsertDataInternalWriter(create, writeConfig, "001", RANDOM.nextInt(100000), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true, false);
        int nextInt = 10 + RANDOM.nextInt(100);
        Dataset randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt / 2, str, false);
        List internalRows = SparkDatasetTestUtils.toInternalRows(randomRows, SparkDatasetTestUtils.ENCODER);
        for (int i = 0; i < 5; i++) {
            internalRows.add(SparkDatasetTestUtils.getInternalRowWithError(str));
        }
        internalRows.addAll(SparkDatasetTestUtils.toInternalRows(SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt / 2, str, false), SparkDatasetTestUtils.ENCODER));
        try {
            Iterator it = internalRows.iterator();
            while (it.hasNext()) {
                hoodieBulkInsertDataInternalWriter.write((InternalRow) it.next());
            }
            Assertions.fail("Should have failed");
        } catch (Throwable th) {
        }
        HoodieWriterCommitMessage commit = hoodieBulkInsertDataInternalWriter.commit();
        Option of = Option.of(new ArrayList());
        Option of2 = Option.of(new ArrayList());
        assertWriteStatuses(commit.getWriteStatuses(), 1, nextInt / 2, of, of2);
        assertOutput(randomRows, this.sqlContext.read().parquet((String[]) ((List) of.get()).toArray(new String[0])), "001", of2, true);
    }

    private void writeRows(Dataset<Row> dataset, HoodieBulkInsertDataInternalWriter hoodieBulkInsertDataInternalWriter) throws Exception {
        Iterator it = SparkDatasetTestUtils.toInternalRows(dataset, SparkDatasetTestUtils.ENCODER).iterator();
        while (it.hasNext()) {
            hoodieBulkInsertDataInternalWriter.write((InternalRow) it.next());
        }
    }
}
