package org.apache.hudi.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.class */
public class TestHoodieDataSourceInternalWriter extends HoodieBulkInsertInternalWriterTestBase {
    @Test
    public void testDataSourceWriter() throws Exception {
        HoodieDataSourceInternalWriter hoodieDataSourceInternalWriter = new HoodieDataSourceInternalWriter("001", SparkDatasetTestUtils.getConfigBuilder(this.basePath).build(), SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.hadoopConf);
        DataWriter<InternalRow> createDataWriter = hoodieDataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
        String[] strArr = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            arrayList.add(this.basePath + "/" + str + "/*");
        }
        int nextInt = 10 + RANDOM.nextInt(1000);
        Dataset<Row> dataset = null;
        for (int i = 0; i < 5; i++) {
            Dataset<Row> randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3], false);
            writeRows(randomRows, createDataWriter);
            dataset = dataset == null ? randomRows : dataset.union(randomRows);
        }
        HoodieWriterCommitMessage commit = createDataWriter.commit();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(commit);
        hoodieDataSourceInternalWriter.commit((WriterCommitMessage[]) arrayList2.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        assertOutput(dataset, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.metaClient.getFs(), (String[]) arrayList.toArray(new String[0])), "001", Option.empty());
        assertWriteStatuses(((HoodieWriterCommitMessage) arrayList2.get(0)).getWriteStatuses(), 5, nextInt, Option.empty(), Option.empty());
    }

    @Test
    public void testMultipleDataSourceWrites() throws Exception {
        HoodieWriteConfig build = SparkDatasetTestUtils.getConfigBuilder(this.basePath).build();
        int i = 0;
        for (int i2 = 0; i2 < 5; i2++) {
            String str = "00" + i2;
            HoodieDataSourceInternalWriter hoodieDataSourceInternalWriter = new HoodieDataSourceInternalWriter(str, build, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.hadoopConf);
            ArrayList arrayList = new ArrayList();
            Dataset<Row> dataset = null;
            int i3 = i;
            i++;
            DataWriter<InternalRow> createDataWriter = hoodieDataSourceInternalWriter.createWriterFactory().createDataWriter(i3, RANDOM.nextLong(), RANDOM.nextLong());
            int nextInt = 10 + RANDOM.nextInt(1000);
            for (int i4 = 0; i4 < 5; i4++) {
                Dataset<Row> randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i4 % 3], false);
                writeRows(randomRows, createDataWriter);
                dataset = dataset == null ? randomRows : dataset.union(randomRows);
            }
            arrayList.add(createDataWriter.commit());
            hoodieDataSourceInternalWriter.commit((WriterCommitMessage[]) arrayList.toArray(new HoodieWriterCommitMessage[0]));
            this.metaClient.reloadActiveTimeline();
            assertOutput(dataset, HoodieClientTestUtils.readCommit(this.basePath, this.sqlContext, this.metaClient.getCommitTimeline(), str), str, Option.empty());
            assertWriteStatuses(((HoodieWriterCommitMessage) arrayList.get(0)).getWriteStatuses(), 5, nextInt, Option.empty(), Option.empty());
        }
    }

    @Test
    public void testLargeWrites() throws Exception {
        HoodieWriteConfig build = SparkDatasetTestUtils.getConfigBuilder(this.basePath).build();
        int i = 0;
        for (int i2 = 0; i2 < 3; i2++) {
            String str = "00" + i2;
            HoodieDataSourceInternalWriter hoodieDataSourceInternalWriter = new HoodieDataSourceInternalWriter(str, build, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.hadoopConf);
            ArrayList arrayList = new ArrayList();
            Dataset<Row> dataset = null;
            int i3 = i;
            i++;
            DataWriter<InternalRow> createDataWriter = hoodieDataSourceInternalWriter.createWriterFactory().createDataWriter(i3, RANDOM.nextLong(), RANDOM.nextLong());
            int nextInt = 10000 + RANDOM.nextInt(10000);
            for (int i4 = 0; i4 < 3; i4++) {
                Dataset<Row> randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i4 % 3], false);
                writeRows(randomRows, createDataWriter);
                dataset = dataset == null ? randomRows : dataset.union(randomRows);
            }
            arrayList.add(createDataWriter.commit());
            hoodieDataSourceInternalWriter.commit((WriterCommitMessage[]) arrayList.toArray(new HoodieWriterCommitMessage[0]));
            this.metaClient.reloadActiveTimeline();
            assertOutput(dataset, HoodieClientTestUtils.readCommit(this.basePath, this.sqlContext, this.metaClient.getCommitTimeline(), str), str, Option.empty());
            assertWriteStatuses(((HoodieWriterCommitMessage) arrayList.get(0)).getWriteStatuses(), 3, nextInt, Option.empty(), Option.empty());
        }
    }

    @Test
    public void testAbort() throws Exception {
        HoodieWriteConfig build = SparkDatasetTestUtils.getConfigBuilder(this.basePath).build();
        HoodieDataSourceInternalWriter hoodieDataSourceInternalWriter = new HoodieDataSourceInternalWriter("000", build, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.hadoopConf);
        DataWriter<InternalRow> createDataWriter = hoodieDataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
        List asList = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
        ArrayList arrayList = new ArrayList();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            arrayList.add(this.basePath + "/" + ((String) it.next()) + "/*");
        }
        int nextInt = 10 + RANDOM.nextInt(100);
        Dataset<Row> dataset = null;
        for (int i = 0; i < 1; i++) {
            Dataset<Row> randomRows = SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3], false);
            writeRows(randomRows, createDataWriter);
            dataset = dataset == null ? randomRows : dataset.union(randomRows);
        }
        HoodieWriterCommitMessage commit = createDataWriter.commit();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(commit);
        hoodieDataSourceInternalWriter.commit((WriterCommitMessage[]) arrayList2.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        assertOutput(dataset, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.metaClient.getFs(), (String[]) arrayList.toArray(new String[0])), "000", Option.empty());
        assertWriteStatuses(((HoodieWriterCommitMessage) arrayList2.get(0)).getWriteStatuses(), 1, nextInt, Option.empty(), Option.empty());
        HoodieDataSourceInternalWriter hoodieDataSourceInternalWriter2 = new HoodieDataSourceInternalWriter("001", build, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.hadoopConf);
        DataWriter<InternalRow> createDataWriter2 = hoodieDataSourceInternalWriter2.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong());
        for (int i2 = 0; i2 < 1; i2++) {
            writeRows(SparkDatasetTestUtils.getRandomRows(this.sqlContext, nextInt, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i2 % 3], false), createDataWriter2);
        }
        HoodieWriterCommitMessage commit2 = createDataWriter2.commit();
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(commit2);
        hoodieDataSourceInternalWriter2.abort((WriterCommitMessage[]) arrayList3.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        assertOutput(dataset, HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.metaClient.getFs(), (String[]) arrayList.toArray(new String[0])), "000", Option.empty());
    }

    private void writeRows(Dataset<Row> dataset, DataWriter<InternalRow> dataWriter) throws Exception {
        Iterator it = SparkDatasetTestUtils.toInternalRows(dataset, SparkDatasetTestUtils.ENCODER).iterator();
        while (it.hasNext()) {
            dataWriter.write((InternalRow) it.next());
        }
    }
}
