package org.apache.hudi.client.functional;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.GenericRecordValidationTestUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.class */
public class TestDataValidationCheckForLogCompactionActions extends HoodieClientTestBase {
    private static final String RECORD_KEY_APPEND_VALUE = "-EXP";
    private static final int PARALLELISM = 2;

    @TempDir
    Path secondTableBasePath;
    private final Random random = new Random();
    HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, HoodieTestDataGenerator, String, Integer> insertsGenFunction = (v0, v1, v2) -> {
        return v0.generateInserts(v1, v2);
    };
    HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, HoodieTestDataGenerator, String, Integer> updatesGenFunction = (v0, v1, v2) -> {
        return v0.generateUniqueUpdates(v1, v2);
    };
    HoodieWriterClientTestHarness.Function2<List<HoodieKey>, HoodieTestDataGenerator, Integer> deletesGenFunction = (v0, v1) -> {
        return v0.generateUniqueDeletes(v1);
    };
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertsFunction = (v0, v1, v2) -> {
        return v0.insert(v1, v2);
    };
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> updatesFunction = (v0, v1, v2) -> {
        return v0.upsert(v1, v2);
    };
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieKey>, String> deletesFunction = (v0, v1, v2) -> {
        return v0.delete(v1, v2);
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions$TestTableContents.class */
    public class TestTableContents {
        final String basePath;
        final String tableName;
        final HoodieTableMetaClient metaClient;
        final HoodieWriteConfig config;
        final SparkRDDWriteClient client;
        String commitTimeOnMainTable;
        List<HoodieRecord> generatedRecords;
        List<HoodieKey> generatedKeysForDelete;
        int previousActionType;
        final SparkRDDWriteClient logCompactionClient;

        public TestTableContents(TestDataValidationCheckForLogCompactionActions testDataValidationCheckForLogCompactionActions, String str, String str2, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient) {
            this(str, str2, hoodieTableMetaClient, hoodieWriteConfig, sparkRDDWriteClient, null);
        }

        public TestTableContents(String str, String str2, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, SparkRDDWriteClient sparkRDDWriteClient2) {
            this.commitTimeOnMainTable = "";
            this.generatedRecords = new ArrayList();
            this.generatedKeysForDelete = new ArrayList();
            this.previousActionType = 0;
            this.basePath = str;
            this.tableName = str2;
            this.metaClient = hoodieTableMetaClient;
            this.config = hoodieWriteConfig;
            this.client = sparkRDDWriteClient;
            this.logCompactionClient = sparkRDDWriteClient2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updatePreviousGeneration(List<HoodieRecord> list, String str, int i) {
            new Schema.Parser().parse(this.config.getSchema());
            this.generatedRecords = (List) list.stream().map(hoodieRecord -> {
                return deepCopyAndModifyRecordKey(hoodieRecord);
            }).collect(Collectors.toList());
            this.commitTimeOnMainTable = str;
            this.previousActionType = i;
        }

        private HoodieRecord deepCopyAndModifyRecordKey(HoodieRecord hoodieRecord) {
            return new HoodieAvroRecord(deepCopyAndModifyRecordKey(hoodieRecord.getKey()), ((RawTripTestPayload) hoodieRecord.getData()).clone());
        }

        private HoodieKey deepCopyAndModifyRecordKey(HoodieKey hoodieKey) {
            return new HoodieKey(hoodieKey.getRecordKey() + TestDataValidationCheckForLogCompactionActions.RECORD_KEY_APPEND_VALUE, hoodieKey.getPartitionPath());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updatePreviousGenerationForDelete(List<HoodieKey> list, String str) {
            this.generatedKeysForDelete = (List) list.stream().map(this::deepCopyAndModifyRecordKey).collect(Collectors.toList());
            this.commitTimeOnMainTable = str;
            this.previousActionType = TestDataValidationCheckForLogCompactionActions.PARALLELISM;
        }
    }

    @BeforeEach
    public void setUpTestTable() {
        HoodieSparkWriteableTestTable.of(this.metaClient);
    }

    @Override // org.apache.hudi.testutils.HoodieSparkClientTestHarness
    @AfterEach
    public void cleanupResources() throws IOException {
        cleanupTimelineService();
        cleanupClients();
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
        cleanupExecutorService();
        System.gc();
    }

    @ValueSource(ints = {17})
    @ParameterizedTest
    public void stressTestCompactionAndLogCompactionOperations(int i) throws Exception {
        this.random.setSeed(i);
        TestTableContents testTableContents = setupTestTable1();
        TestTableContents testTableContents2 = setupTestTable2();
        LOG.warn("Starting trial with seed " + i);
        for (int i2 = 1; i2 < 15; i2++) {
            LOG.warn("Starting write No. " + i2);
            if (writeOnMainTable(testTableContents, i2)) {
                writeOnExperimentTable(testTableContents, testTableContents2);
                scheduleLogCompactionOnExperimentTable(testTableContents2);
                Assertions.assertEquals(0, testTableContents.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants());
                Assertions.assertEquals(0, testTableContents2.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants());
                Assertions.assertEquals(0, testTableContents2.metaClient.reloadActiveTimeline().filterPendingLogCompactionTimeline().countInstants());
                verifyRecords(testTableContents, testTableContents2);
                LOG.warn("For write No." + i2 + ", verification passed. Last ingestion commit timestamp is " + testTableContents.commitTimeOnMainTable);
            }
        }
        testTableContents.client.close();
        testTableContents2.client.close();
    }

    private void verifyRecords(TestTableContents testTableContents, TestTableContents testTableContents2) {
        Map recordsMap = GenericRecordValidationTestUtils.getRecordsMap(testTableContents.config, this.storageConf, this.dataGen);
        Map recordsMap2 = GenericRecordValidationTestUtils.getRecordsMap(testTableContents2.config, this.storageConf, this.dataGen);
        Assertions.assertEquals(recordsMap.size(), recordsMap2.size());
        Schema parse = new Schema.Parser().parse(testTableContents.config.getSchema());
        List createImmutableList = CollectionUtils.createImmutableList(new String[]{HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD, HoodieRecord.OPERATION_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD});
        recordsMap.forEach((str, genericRecord) -> {
            Assertions.assertTrue(recordsMap2.containsKey(str + RECORD_KEY_APPEND_VALUE));
            GenericRecordValidationTestUtils.assertGenericRecords(genericRecord, (GenericRecord) recordsMap2.get(str + RECORD_KEY_APPEND_VALUE), parse, createImmutableList);
        });
    }

    private void scheduleLogCompactionOnExperimentTable(TestTableContents testTableContents) {
        Option scheduleLogCompaction = testTableContents.logCompactionClient.scheduleLogCompaction(Option.empty());
        if (scheduleLogCompaction.isPresent()) {
            testTableContents.logCompactionClient.logCompact((String) scheduleLogCompaction.get());
        }
    }

    private boolean writeOnMainTable(TestTableContents testTableContents, int i) throws IOException {
        JavaRDD<WriteStatus> insertDataIntoMainTable;
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        testTableContents.client.startCommitWithTime(createNewInstantTime);
        int pickAWriteAction = pickAWriteAction();
        if (i == 1 || pickAWriteAction == 0) {
            insertDataIntoMainTable = insertDataIntoMainTable(testTableContents, createNewInstantTime);
        } else {
            try {
                insertDataIntoMainTable = pickAWriteAction == 1 ? updateDataIntoMainTable(testTableContents, createNewInstantTime) : deleteDataIntoMainTable(testTableContents, createNewInstantTime);
            } catch (IllegalArgumentException e) {
                LOG.warn(e.getMessage() + " ignoring current command.");
                return false;
            }
        }
        verifyWriteStatus(insertDataIntoMainTable);
        return true;
    }

    private int pickAWriteAction() {
        int nextInt = this.random.nextInt(10);
        if (nextInt < 5) {
            return 0;
        }
        if (nextInt < 8) {
            return 1;
        }
        return PARALLELISM;
    }

    private void writeOnExperimentTable(TestTableContents testTableContents, TestTableContents testTableContents2) throws IOException {
        testTableContents2.client.startCommitWithTime(testTableContents.commitTimeOnMainTable);
        int i = testTableContents.previousActionType;
        verifyWriteStatus(i == 0 ? insertDataIntoExperimentTable(testTableContents, testTableContents2) : i == 1 ? updateDataIntoExperimentTable(testTableContents, testTableContents2) : deleteDataIntoExperimentTable(testTableContents, testTableContents2));
    }

    private JavaRDD<WriteStatus> insertDataIntoMainTable(TestTableContents testTableContents, String str) throws IOException {
        List list = (List) this.insertsGenFunction.apply(this.dataGen, str, Integer.valueOf(50 + this.random.nextInt(10)));
        testTableContents.updatePreviousGeneration(list, str, 0);
        return (JavaRDD) this.insertsFunction.apply(testTableContents.client, this.jsc.parallelize(list, PARALLELISM), str);
    }

    private JavaRDD<WriteStatus> updateDataIntoMainTable(TestTableContents testTableContents, String str) throws IOException {
        List list = (List) this.updatesGenFunction.apply(this.dataGen, str, Integer.valueOf(10 + this.random.nextInt(10)));
        testTableContents.updatePreviousGeneration(list, str, 1);
        return (JavaRDD) this.updatesFunction.apply(testTableContents.client, this.jsc.parallelize(list, PARALLELISM), str);
    }

    private JavaRDD<WriteStatus> deleteDataIntoMainTable(TestTableContents testTableContents, String str) throws IOException {
        List list = (List) this.deletesGenFunction.apply(this.dataGen, Integer.valueOf(5 + this.random.nextInt(10)));
        testTableContents.updatePreviousGenerationForDelete(list, str);
        return (JavaRDD) this.deletesFunction.apply(testTableContents.client, this.jsc.parallelize(list, PARALLELISM), str);
    }

    private JavaRDD<WriteStatus> insertDataIntoExperimentTable(TestTableContents testTableContents, TestTableContents testTableContents2) throws IOException {
        return (JavaRDD) this.insertsFunction.apply(testTableContents2.client, this.jsc.parallelize(testTableContents.generatedRecords, PARALLELISM), testTableContents.commitTimeOnMainTable);
    }

    private JavaRDD<WriteStatus> updateDataIntoExperimentTable(TestTableContents testTableContents, TestTableContents testTableContents2) throws IOException {
        return (JavaRDD) this.updatesFunction.apply(testTableContents2.client, this.jsc.parallelize(testTableContents.generatedRecords, PARALLELISM), testTableContents.commitTimeOnMainTable);
    }

    private JavaRDD<WriteStatus> deleteDataIntoExperimentTable(TestTableContents testTableContents, TestTableContents testTableContents2) throws IOException {
        return (JavaRDD) this.deletesFunction.apply(testTableContents2.client, this.jsc.parallelize(testTableContents.generatedKeysForDelete, PARALLELISM), testTableContents.commitTimeOnMainTable);
    }

    private void verifyWriteStatus(JavaRDD<WriteStatus> javaRDD) {
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(javaRDD.collect());
    }

    private TestTableContents setupTestTable1() {
        Properties properties = new Properties();
        properties.setProperty("hoodie.parquet.small.file.limit", "0");
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build()).withAutoCommit(true).withProperties(properties).build();
        return new TestTableContents(this, this.basePath, this.tableName, this.metaClient, build, new SparkRDDWriteClient(this.context, build));
    }

    private TestTableContents setupTestTable2() throws IOException {
        String createBasePathForSecondTable = createBasePathForSecondTable(this.secondTableBasePath);
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.NAME.key(), "test-trip-table2");
        HoodieTableMetaClient init = HoodieTestUtils.init(this.storageConf, createBasePathForSecondTable, HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig build = getConfigBuilderForSecondTable("test-trip-table2", createBasePathForSecondTable, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build()).withAutoCommit(true).build();
        return new TestTableContents(createBasePathForSecondTable, "test-trip-table2", init, build, new SparkRDDWriteClient(this.context, build), new SparkRDDWriteClient(this.context, HoodieWriteConfig.newBuilder().withProps(build.getProps()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withLogCompactionBlocksThreshold(PARALLELISM).build()).build()));
    }

    private String createBasePathForSecondTable(Path path) throws IOException {
        Path resolve = path.resolve("dataset2");
        Files.createDirectories(resolve, new FileAttribute[0]);
        return resolve.toString();
    }

    private HoodieWriteConfig.Builder getConfigBuilderForSecondTable(String str, String str2, String str3, HoodieIndex.IndexType indexType) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.parquet.small.file.limit", "0");
        return HoodieWriteConfig.newBuilder().withPath(str2).withSchema(str3).withParallelism(PARALLELISM, PARALLELISM).withBulkInsertParallelism(PARALLELISM).withFinalizeWriteParallelism(PARALLELISM).withDeleteParallelism(PARALLELISM).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).orcMaxFileSize(1048576L).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build()).forTable(str).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withEnableBackupForRemoteFileSystemView(false).withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withProperties(properties);
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }
}
