package org.apache.hudi.client.functional;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.GenericRecordValidationTestUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.class */
public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
    private HoodieTestTable testTable;

    @BeforeEach
    public void setUpTestTable() {
        this.testTable = HoodieSparkWriteableTestTable.of(this.metaClient);
    }

    @Test
    public void testReadingMORTableWithoutBaseFile() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.empty());
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        updateBatch(build, hoodieWriteClient, createNewInstantTime2, createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 50, 100, 2, build.populateMetaFields());
        deleteBatch(build, hoodieWriteClient, HoodieActiveTimeline.createNewInstantTime(), createNewInstantTime2, "000", 25, false, false, 0, 100);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals(75, GenericRecordValidationTestUtils.getRecordsMap(build, this.hadoopConf, this.dataGen).size());
    }

    @Test
    public void testCompactionOnMORTable() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.empty());
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        updateBatch(build, hoodieWriteClient, createNewInstantTime2, createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 5, 100, 2, build.populateMetaFields());
        Option scheduleCompaction = hoodieWriteClient.scheduleCompaction(Option.empty());
        Assertions.assertTrue(scheduleCompaction.isPresent());
        hoodieWriteClient.compact((String) scheduleCompaction.get());
        this.metaClient.reloadActiveTimeline();
        GenericRecordValidationTestUtils.assertDataInMORTable(build, createNewInstantTime2, (String) scheduleCompaction.get(), this.hadoopConf, Arrays.asList(this.dataGen.getPartitionPaths()));
    }

    @Test
    public void testLogCompactionOnMORTable() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        int i = 100;
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.empty());
        String str = createNewInstantTime;
        for (int i2 = 0; i2 < 5; i2++) {
            String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
            i += 50;
            updateBatch(build, hoodieWriteClient, createNewInstantTime2, str, Option.of(Arrays.asList(str)), "000", 50, (v0, v1, v2) -> {
                return v0.upsert(v1, v2);
            }, false, false, 50, i, i2 + 2, build.populateMetaFields());
            str = createNewInstantTime2;
        }
        Option scheduleCompaction = hoodieWriteClient.scheduleCompaction(Option.empty());
        Assertions.assertTrue(scheduleCompaction.isPresent());
        hoodieWriteClient.compact((String) scheduleCompaction.get());
        String str2 = (String) scheduleCompaction.get();
        for (int i3 = 0; i3 < 2; i3++) {
            String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
            i += 50;
            updateBatch(build, hoodieWriteClient, createNewInstantTime3, str2, Option.of(Arrays.asList(str2)), "000", 50, (v0, v1, v2) -> {
                return v0.upsert(v1, v2);
            }, false, false, 50, i, i3 + 8, build.populateMetaFields());
            str2 = createNewInstantTime3;
        }
        Option scheduleLogCompaction = hoodieWriteClient.scheduleLogCompaction(Option.empty());
        Assertions.assertTrue(scheduleLogCompaction.isPresent());
        hoodieWriteClient.logCompact((String) scheduleLogCompaction.get());
        GenericRecordValidationTestUtils.assertDataInMORTable(build, str2, (String) scheduleLogCompaction.get(), this.hadoopConf, Arrays.asList(this.dataGen.getPartitionPaths()));
    }

    @Test
    public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withEnableOptimizedLogBlocksScan("true").withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.of("2016/03/15"));
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        updateBatch(build, hoodieWriteClient, createNewInstantTime2, createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 50, 100, 2, build.populateMetaFields());
        String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
        deleteBatch(build, hoodieWriteClient, createNewInstantTime3, createNewInstantTime2, "000", 30, false, false, 0, 70);
        Option scheduleLogCompaction = hoodieWriteClient.scheduleLogCompaction(Option.empty());
        Assertions.assertTrue(scheduleLogCompaction.isPresent());
        hoodieWriteClient.logCompact((String) scheduleLogCompaction.get());
        GenericRecordValidationTestUtils.assertDataInMORTable(build, createNewInstantTime3, (String) scheduleLogCompaction.get(), this.hadoopConf, Arrays.asList("2016/03/15"));
    }

    @Test
    public void testSchedulingLogCompactionAfterSchedulingCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.empty());
        updateBatch(build, hoodieWriteClient, HoodieActiveTimeline.createNewInstantTime(), createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 50, 100, 2, build.populateMetaFields());
        Assertions.assertTrue(hoodieWriteClient.scheduleCompaction(Option.empty()).isPresent());
        Assertions.assertFalse(hoodieWriteClient.scheduleLogCompaction(Option.empty()).isPresent());
    }

    @Test
    public void testSchedulingCompactionAfterSchedulingLogCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withMarkersType(MarkerType.DIRECT.name()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        insertBatch(build, hoodieWriteClient, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 10, 100, 1, Option.empty());
        updateBatch(build, hoodieWriteClient, HoodieActiveTimeline.createNewInstantTime(), createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 50, 10, 2, build.populateMetaFields());
        Assertions.assertTrue(hoodieWriteClient.scheduleLogCompaction(Option.empty()).isPresent());
        Assertions.assertTrue(hoodieWriteClient.scheduleCompaction(Option.empty()).isPresent());
    }

    @Test
    public void testCleanFunctionalityWhenCompactionRequestedInstantIsPresent() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(4).build()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        insertBatch(build, hoodieWriteClient, HoodieActiveTimeline.createNewInstantTime(), "000", 100, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, 100, 100, 1, Option.empty());
        Option scheduleCompaction = hoodieWriteClient.scheduleCompaction(Option.empty());
        Assertions.assertTrue(scheduleCompaction.isPresent());
        hoodieWriteClient.compact((String) scheduleCompaction.get());
        String str = (String) scheduleCompaction.get();
        updateBatch(build, hoodieWriteClient, HoodieActiveTimeline.createNewInstantTime(), str, Option.of(Arrays.asList(str)), "000", 50, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 50, 100, 2, build.populateMetaFields());
        Option scheduleCompaction2 = hoodieWriteClient.scheduleCompaction(Option.empty());
        Assertions.assertTrue(scheduleCompaction2.isPresent());
        String str2 = (String) scheduleCompaction2.get();
        for (int i = 0; i < 6; i++) {
            String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
            updateBatch(build, hoodieWriteClient, createNewInstantTime, str2, Option.of(Arrays.asList(str2)), "000", 50, (v0, v1, v2) -> {
                return v0.upsert(v1, v2);
            }, false, false, 50, 100, 2, build.populateMetaFields());
            str2 = createNewInstantTime;
            if (i == 2) {
                Assertions.assertEquals("clean", ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getAction());
            } else {
                Assertions.assertEquals("deltacommit", ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getAction());
            }
        }
    }

    @Test
    public void testRollbackOnLogCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).build();
        HoodieWriteConfig build2 = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(this.context, build);
        Throwable th = null;
        try {
            SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(this.context, build2);
            Throwable th2 = null;
            try {
                try {
                    String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                    insertBatch(build2, sparkRDDWriteClient2, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
                        return v0.insert(v1, v2);
                    }, false, false, 100, 100, 1, Option.empty());
                    String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
                    updateBatch(build2, sparkRDDWriteClient2, createNewInstantTime2, createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 10, (v0, v1, v2) -> {
                        return v0.upsert(v1, v2);
                    }, false, false, 10, 100, 4, build2.populateMetaFields());
                    Option scheduleLogCompaction = sparkRDDWriteClient.scheduleLogCompaction(Option.empty());
                    Assertions.assertTrue(scheduleLogCompaction.isPresent());
                    sparkRDDWriteClient.logCompact((String) scheduleLogCompaction.get());
                    getHoodieTable(this.metaClient, build2).rollbackInflightLogCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "logcompaction", (String) scheduleLogCompaction.get()));
                    HoodieActiveTimeline reloadActiveTimeline = this.metaClient.reloadActiveTimeline();
                    HoodieInstant hoodieInstant = (HoodieInstant) reloadActiveTimeline.lastInstant().get();
                    Assertions.assertEquals(3, reloadActiveTimeline.countInstants());
                    Assertions.assertEquals("rollback", hoodieInstant.getAction());
                    validateBlockInstantsBeforeAndAfterRollback(build2, createNewInstantTime2, hoodieInstant.getTimestamp());
                    String timestamp = hoodieInstant.getTimestamp();
                    String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
                    updateBatch(build2, sparkRDDWriteClient2, createNewInstantTime3, timestamp, Option.of(Arrays.asList(timestamp)), "000", 10, (v0, v1, v2) -> {
                        return v0.upsert(v1, v2);
                    }, false, false, 10, 100, 4, build2.populateMetaFields());
                    Option scheduleLogCompaction2 = sparkRDDWriteClient.scheduleLogCompaction(Option.empty());
                    Assertions.assertTrue(scheduleLogCompaction2.isPresent());
                    sparkRDDWriteClient.commitLogCompaction((String) scheduleLogCompaction2.get(), (HoodieCommitMetadata) sparkRDDWriteClient.logCompact((String) scheduleLogCompaction2.get()).getCommitMetadata().get(), Option.empty());
                    GenericRecordValidationTestUtils.assertDataInMORTable(build2, createNewInstantTime3, (String) scheduleLogCompaction2.get(), this.hadoopConf, Arrays.asList(this.dataGen.getPartitionPaths()));
                    if (sparkRDDWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                sparkRDDWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            sparkRDDWriteClient2.close();
                        }
                    }
                    if (sparkRDDWriteClient != null) {
                        if (0 == 0) {
                            sparkRDDWriteClient.close();
                            return;
                        }
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (sparkRDDWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            sparkRDDWriteClient2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        sparkRDDWriteClient2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th8;
        }
    }

    private void validateBlockInstantsBeforeAndAfterRollback(HoodieWriteConfig hoodieWriteConfig, String str, String str2) {
        HoodieSparkTable hoodieTable = getHoodieTable(this.metaClient, hoodieWriteConfig);
        SyncableFileSystemView sliceView = hoodieTable.getSliceView();
        Iterator it = ((List) Stream.of((Object[]) HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            sliceView.getLatestFileSlices((String) it.next()).forEach(fileSlice -> {
                HoodieUnMergedLogRecordScanner build = HoodieUnMergedLogRecordScanner.newBuilder().withFileSystem(this.metaClient.getFs()).withBasePath(hoodieTable.getMetaClient().getBasePath()).withLogFilePaths((List) fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile -> {
                    return hoodieLogFile.getPath().toString();
                }).collect(Collectors.toList())).withLatestInstantTime(str).withBufferSize(hoodieWriteConfig.getMaxDFSStreamBufferSize()).withOptimizedLogBlocksScan(true).build();
                build.scan(true);
                List validBlockInstants = build.getValidBlockInstants();
                HoodieUnMergedLogRecordScanner build2 = HoodieUnMergedLogRecordScanner.newBuilder().withFileSystem(this.metaClient.getFs()).withBasePath(hoodieTable.getMetaClient().getBasePath()).withLogFilePaths((List) fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile2 -> {
                    return hoodieLogFile2.getPath().toString();
                }).collect(Collectors.toList())).withLatestInstantTime(str2).withBufferSize(hoodieWriteConfig.getMaxDFSStreamBufferSize()).withOptimizedLogBlocksScan(true).build();
                build2.scan(true);
                Assertions.assertEquals(validBlockInstants, build2.getValidBlockInstants());
            });
        }
    }

    @Test
    public void testArchivalOnLogCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build()).build();
        HoodieWriteConfig build2 = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2).build()).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(this.context, build);
        Throwable th = null;
        try {
            SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(this.context, build2);
            Throwable th2 = null;
            try {
                try {
                    String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                    insertBatch(build2, sparkRDDWriteClient2, createNewInstantTime, "000", 100, (v0, v1, v2) -> {
                        return v0.insert(v1, v2);
                    }, false, false, 10, 100, 1, Option.empty());
                    String str = createNewInstantTime;
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 6; i++) {
                        if (i % 4 == 0) {
                            Option scheduleCompaction = sparkRDDWriteClient2.scheduleCompaction(Option.empty());
                            Assertions.assertTrue(scheduleCompaction.isPresent());
                            sparkRDDWriteClient2.compact((String) scheduleCompaction.get());
                            str = (String) scheduleCompaction.get();
                        }
                        updateBatch(build2, sparkRDDWriteClient2, HoodieActiveTimeline.createNewInstantTime(), str, Option.of(Arrays.asList(str)), "000", 50, (v0, v1, v2) -> {
                            return v0.upsert(v1, v2);
                        }, false, false, 50, 10, 0, build2.populateMetaFields());
                        Option scheduleLogCompaction = sparkRDDWriteClient.scheduleLogCompaction(Option.empty());
                        if (scheduleLogCompaction.isPresent()) {
                            arrayList.add(scheduleLogCompaction.get());
                            sparkRDDWriteClient.logCompact((String) scheduleLogCompaction.get());
                            str = (String) scheduleLogCompaction.get();
                        }
                    }
                    boolean z = false;
                    Map map = (Map) this.metaClient.getArchivedTimeline().getInstantsAsStream().collect(Collectors.groupingBy((v0) -> {
                        return v0.getTimestamp();
                    }));
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        List list = (List) map.get((String) it.next());
                        if (list != null) {
                            Assertions.assertEquals(1, list.size());
                            Assertions.assertEquals("deltacommit", ((HoodieInstant) list.get(0)).getAction());
                            z = true;
                        }
                    }
                    Assertions.assertTrue(z);
                    if (sparkRDDWriteClient2 != null) {
                        if (0 != 0) {
                            try {
                                sparkRDDWriteClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            sparkRDDWriteClient2.close();
                        }
                    }
                    if (sparkRDDWriteClient != null) {
                        if (0 == 0) {
                            sparkRDDWriteClient.close();
                            return;
                        }
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (sparkRDDWriteClient2 != null) {
                    if (th2 != null) {
                        try {
                            sparkRDDWriteClient2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        sparkRDDWriteClient2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th8;
        }
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }
}
