/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.spark3.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
import org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite;
import org.apache.hudi.spark3.internal.HoodieWriterCommitMessage;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieDataSourceInternalBatchWrite
extends HoodieBulkInsertInternalWriterTestBase {
    private static Stream<Arguments> bulkInsertTypeParams() {
        Object[][] data = new Object[][]{{true}, {false}};
        return Stream.of(data).map(Arguments::of);
    }

    @ParameterizedTest
    @MethodSource(value={"bulkInsertTypeParams"})
    public void testDataSourceWriter(boolean populateMetaFields) throws Exception {
        this.testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields);
    }

    private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(populateMetaFields);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String instantTime = "001";
        HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = new HoodieDataSourceInternalBatchWrite(instantTime, cfg, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.storageConf, extraMetadata, populateMetaFields, false);
        DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
        String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
        ArrayList<String> partitionPathsAbs = new ArrayList<String>();
        for (String partitionPath : partitionPaths) {
            partitionPathsAbs.add(this.basePath + "/" + partitionPath + "/*");
        }
        int size = 10 + RANDOM.nextInt(1000);
        int batches = 5;
        Dataset totalInputRows = null;
        for (int j = 0; j < batches; ++j) {
            String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
            Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
            this.writeRows((Dataset<Row>)inputRows, (DataWriter<InternalRow>)writer);
            totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
        }
        HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
        ArrayList<HoodieWriterCommitMessage> commitMessages = new ArrayList<HoodieWriterCommitMessage>();
        commitMessages.add(commitMetadata);
        dataSourceInternalBatchWrite.commit((WriterCommitMessage[])commitMessages.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        Dataset result = HoodieClientTestUtils.read((JavaSparkContext)this.jsc, (String)this.basePath, (SQLContext)this.sqlContext, (HoodieStorage)this.metaClient.getStorage(), (String[])partitionPathsAbs.toArray(new String[0]));
        this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime, (Option<List<String>>)Option.empty(), populateMetaFields);
        this.assertWriteStatuses(((HoodieWriterCommitMessage)commitMessages.get(0)).getWriteStatuses(), batches, size, (Option<List<String>>)Option.empty(), (Option<List<String>>)Option.empty());
        Option commitMetadataOption = HoodieClientTestUtils.getCommitMetadataForLatestInstant((HoodieTableMetaClient)this.metaClient);
        Assertions.assertTrue((boolean)commitMetadataOption.isPresent());
        HashMap actualExtraMetadata = new HashMap();
        ((HoodieCommitMetadata)commitMetadataOption.get()).getExtraMetadata().entrySet().stream().filter(entry -> !((String)entry.getKey()).equals("schema")).forEach(entry -> {
            String cfr_ignored_0 = (String)actualExtraMetadata.put(entry.getKey(), entry.getValue());
        });
        Assertions.assertEquals(actualExtraMetadata, expectedExtraMetadata);
    }

    @Test
    public void testDataSourceWriterExtraCommitMetadata() throws Exception {
        String commitExtraMetaPrefix = "commit_extra_meta_";
        HashMap<String, String> extraMeta = new HashMap<String, String>();
        extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix);
        extraMeta.put(commitExtraMetaPrefix + "a", "valA");
        extraMeta.put(commitExtraMetaPrefix + "b", "valB");
        extraMeta.put("commit_extra_c", "valC");
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.putAll(extraMeta);
        expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key());
        expectedMetadata.remove("commit_extra_c");
        this.testDataSourceWriterInternal(extraMeta, expectedMetadata, true);
    }

    @Test
    public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception {
        String commitExtraMetaPrefix = "commit_extra_meta_";
        HashMap<String, String> extraMeta = new HashMap<String, String>();
        extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), commitExtraMetaPrefix);
        extraMeta.put("keyA", "valA");
        extraMeta.put("keyB", "valB");
        extraMeta.put("commit_extra_c", "valC");
        this.testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true);
    }

    @ParameterizedTest
    @MethodSource(value={"bulkInsertTypeParams"})
    public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(populateMetaFields);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        int partitionCounter = 0;
        for (int i = 0; i < 2; ++i) {
            String instantTime = "00" + i;
            HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = new HoodieDataSourceInternalBatchWrite(instantTime, cfg, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.storageConf, Collections.EMPTY_MAP, populateMetaFields, false);
            ArrayList<HoodieWriterCommitMessage> commitMessages = new ArrayList<HoodieWriterCommitMessage>();
            Dataset totalInputRows = null;
            DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
            int size = 10 + RANDOM.nextInt(1000);
            int batches = 3;
            for (int j = 0; j < batches; ++j) {
                String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
                Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
                this.writeRows((Dataset<Row>)inputRows, (DataWriter<InternalRow>)writer);
                totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
            }
            HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
            commitMessages.add(commitMetadata);
            dataSourceInternalBatchWrite.commit((WriterCommitMessage[])commitMessages.toArray(new HoodieWriterCommitMessage[0]));
            this.metaClient.reloadActiveTimeline();
            Dataset result = HoodieClientTestUtils.readCommit((String)this.basePath, (SQLContext)this.sqlContext, (HoodieTimeline)this.metaClient.getCommitTimeline(), (String)instantTime, (boolean)populateMetaFields, (InstantGenerator)HoodieTestUtils.INSTANT_GENERATOR);
            this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime, (Option<List<String>>)Option.empty(), populateMetaFields);
            this.assertWriteStatuses(((HoodieWriterCommitMessage)commitMessages.get(0)).getWriteStatuses(), batches, size, (Option<List<String>>)Option.empty(), (Option<List<String>>)Option.empty());
        }
    }

    @Disabled
    @ParameterizedTest
    @MethodSource(value={"bulkInsertTypeParams"})
    public void testLargeWrites(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(populateMetaFields);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        int partitionCounter = 0;
        for (int i = 0; i < 3; ++i) {
            String instantTime = "00" + i;
            HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = new HoodieDataSourceInternalBatchWrite(instantTime, cfg, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.storageConf, Collections.EMPTY_MAP, populateMetaFields, false);
            ArrayList<HoodieWriterCommitMessage> commitMessages = new ArrayList<HoodieWriterCommitMessage>();
            Dataset totalInputRows = null;
            DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
            int size = 10000 + RANDOM.nextInt(10000);
            int batches = 3;
            for (int j = 0; j < batches; ++j) {
                String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
                Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
                this.writeRows((Dataset<Row>)inputRows, (DataWriter<InternalRow>)writer);
                totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
            }
            HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
            commitMessages.add(commitMetadata);
            dataSourceInternalBatchWrite.commit((WriterCommitMessage[])commitMessages.toArray(new HoodieWriterCommitMessage[0]));
            this.metaClient.reloadActiveTimeline();
            Dataset result = HoodieClientTestUtils.readCommit((String)this.basePath, (SQLContext)this.sqlContext, (HoodieTimeline)this.metaClient.getCommitTimeline(), (String)instantTime, (boolean)populateMetaFields, (InstantGenerator)HoodieTestUtils.INSTANT_GENERATOR);
            this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime, (Option<List<String>>)Option.empty(), populateMetaFields);
            this.assertWriteStatuses(((HoodieWriterCommitMessage)commitMessages.get(0)).getWriteStatuses(), batches, size, (Option<List<String>>)Option.empty(), (Option<List<String>>)Option.empty());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"bulkInsertTypeParams"})
    public void testAbort(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig cfg = this.getWriteConfig(populateMetaFields);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String instantTime0 = "000";
        HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.storageConf, Collections.EMPTY_MAP, populateMetaFields, false);
        DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
        List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
        ArrayList<String> partitionPathsAbs = new ArrayList<String>();
        for (String partitionPath : partitionPaths) {
            partitionPathsAbs.add(this.basePath + "/" + partitionPath + "/*");
        }
        int size = 10 + RANDOM.nextInt(100);
        int batches = 1;
        Dataset totalInputRows = null;
        for (int j = 0; j < batches; ++j) {
            String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
            Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
            this.writeRows((Dataset<Row>)inputRows, (DataWriter<InternalRow>)writer);
            totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
        }
        HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)writer.commit();
        ArrayList<HoodieWriterCommitMessage> commitMessages = new ArrayList<HoodieWriterCommitMessage>();
        commitMessages.add(commitMetadata);
        dataSourceInternalBatchWrite.commit((WriterCommitMessage[])commitMessages.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        Dataset result = HoodieClientTestUtils.read((JavaSparkContext)this.jsc, (String)this.basePath, (SQLContext)this.sqlContext, (HoodieStorage)this.metaClient.getStorage(), (String[])partitionPathsAbs.toArray(new String[0]));
        this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime0, (Option<List<String>>)Option.empty(), populateMetaFields);
        this.assertWriteStatuses(((HoodieWriterCommitMessage)commitMessages.get(0)).getWriteStatuses(), batches, size, (Option<List<String>>)Option.empty(), (Option<List<String>>)Option.empty());
        String instantTime1 = "001";
        dataSourceInternalBatchWrite = new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, SparkDatasetTestUtils.STRUCT_TYPE, this.sqlContext.sparkSession(), this.storageConf, Collections.EMPTY_MAP, populateMetaFields, false);
        writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong());
        for (int j = 0; j < batches; ++j) {
            String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
            Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
            this.writeRows((Dataset<Row>)inputRows, (DataWriter<InternalRow>)writer);
        }
        commitMetadata = (HoodieWriterCommitMessage)writer.commit();
        commitMessages = new ArrayList();
        commitMessages.add(commitMetadata);
        dataSourceInternalBatchWrite.abort((WriterCommitMessage[])commitMessages.toArray(new HoodieWriterCommitMessage[0]));
        this.metaClient.reloadActiveTimeline();
        result = HoodieClientTestUtils.read((JavaSparkContext)this.jsc, (String)this.basePath, (SQLContext)this.sqlContext, (HoodieStorage)this.metaClient.getStorage(), (String[])partitionPathsAbs.toArray(new String[0]));
        this.assertOutput((Dataset<Row>)totalInputRows, (Dataset<Row>)result, instantTime0, (Option<List<String>>)Option.empty(), populateMetaFields);
    }

    private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> writer) throws Exception {
        List internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        for (InternalRow internalRow : internalRows) {
            writer.write((Object)internalRow);
        }
    }
}

