package org.apache.hudi.utilities;

import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.utilities.HoodieIndexer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/utilities/TestHoodieIndexer.class */
public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implements SparkProvider {
    private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(0);
    private static int colStatsFileGroupCount = ((Integer) HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue()).intValue();
    private HoodieTableMetaClient metaClient;

    @BeforeEach
    public void init() throws IOException {
        this.metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
    }

    @AfterAll
    public static void cleanup() {
        DATA_GENERATOR.close();
    }

    @Test
    public void testGetRequestedPartitionTypes() {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        config.basePath = basePath();
        config.tableName = "indexer_test";
        config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
        List requestedPartitionTypes = new HoodieIndexer(jsc(), config).getRequestedPartitionTypes(config.indexTypes, Option.empty());
        Assertions.assertTrue(requestedPartitionTypes.contains(MetadataPartitionType.FILES));
        Assertions.assertTrue(requestedPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS));
        Assertions.assertTrue(requestedPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS));
    }

    @Test
    public void testIsIndexBuiltForAllRequestedTypes() {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        config.basePath = basePath();
        config.tableName = "indexer_test";
        config.indexTypes = "BLOOM_FILTERS,COLUMN_STATS";
        HoodieIndexer hoodieIndexer = new HoodieIndexer(jsc(), config);
        HoodieIndexCommitMetadata build = HoodieIndexCommitMetadata.newBuilder().setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo(1, MetadataPartitionType.COLUMN_STATS.getPartitionPath(), "0000", Collections.emptyMap()))).build();
        Assertions.assertFalse(hoodieIndexer.isIndexBuiltForAllRequestedTypes(build.getIndexPartitionInfos()));
        config.indexTypes = "COLUMN_STATS";
        Assertions.assertTrue(new HoodieIndexer(jsc(), config).isIndexBuiltForAllRequestedTypes(build.getIndexPartitionInfos()));
    }

    @Test
    public void testIndexerWithNotAllIndexesEnabled() {
        upsertToTable(getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).build(), "indexer_test");
        Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
        indexMetadataPartitionsAndAssert(MetadataPartitionType.COLUMN_STATS, Arrays.asList(MetadataPartitionType.FILES, MetadataPartitionType.BLOOM_FILTERS), Collections.emptyList(), "indexer_test", "streamer-config/indexer.properties");
    }

    @Test
    public void testIndexerWithFilesPartition() {
        upsertToTable(getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true).build(), "indexer_test");
        Assertions.assertFalse(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        indexMetadataPartitionsAndAssert(MetadataPartitionType.FILES, Collections.emptyList(), Arrays.asList(MetadataPartitionType.COLUMN_STATS, MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer.properties");
    }

    @Test
    public void testIndexerForRecordIndex() {
        upsertToTable(getMetadataConfigBuilder(true, false).build(), "indexer_test");
        Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        indexMetadataPartitionsAndAssert(MetadataPartitionType.RECORD_INDEX, Collections.singletonList(MetadataPartitionType.FILES), Arrays.asList(MetadataPartitionType.COLUMN_STATS, MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer-record-index.properties");
    }

    @Test
    public void testIndexerWithWriterFinishingFirst() throws IOException {
        HoodieMetadataConfig build = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).build();
        upsertToTable(build, "indexer_with_writer_finishing_first");
        Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
        scheduleAndExecuteIndexing(MetadataPartitionType.COLUMN_STATS, "indexer_with_writer_finishing_first", "streamer-config/indexer.properties");
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.getActiveTimeline().filter(hoodieInstant2 -> {
            return "indexing".equals(hoodieInstant2.getAction());
        }).getInstants().get(0);
        ((HoodieIndexPartitionInfo) TimelineMetadataUtils.deserializeIndexPlan((byte[]) this.metaClient.getActiveTimeline().readIndexPlanAsBytes(hoodieInstant).get()).getIndexPartitionInfos().get(0)).getIndexUptoInstant();
        HoodieTableMetaClient metadataMetaClient = new HoodieBackedTableMetadata(context(), this.metaClient.getStorage(), build, this.metaClient.getBasePath().toString()).getMetadataMetaClient();
        String timestamp = hoodieInstant.getTimestamp();
        Assertions.assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(timestamp));
        this.metaClient.getActiveTimeline().revertToInflight(hoodieInstant);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieInstant hoodieInstant3 = (HoodieInstant) metadataMetaClient.getActiveTimeline().filter(hoodieInstant4 -> {
            return hoodieInstant4.getTimestamp().equals(timestamp);
        }).getInstants().get(0);
        metadataMetaClient.getActiveTimeline().revertToInflight(hoodieInstant3);
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(metadataMetaClient);
        HoodieHeartbeatClient hoodieHeartbeatClient = new HoodieHeartbeatClient(reload.getStorage(), reload.getBasePath().toString(), Long.valueOf(((Integer) HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue()).longValue()), (Integer) HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
        hoodieHeartbeatClient.start(timestamp);
        HoodieMetadataConfig build2 = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(true).build();
        upsertToTable(build2, "indexer_with_writer_finishing_first");
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieTableMetaClient reload2 = HoodieTableMetaClient.reload(reload);
        Assertions.assertTrue(reload2.getActiveTimeline().containsInstant(hoodieInstant3.getTimestamp()));
        Assertions.assertTrue(reload2.getActiveTimeline().getRollbackTimeline().empty());
        hoodieHeartbeatClient.stop(timestamp);
        upsertToTable(build2, "indexer_with_writer_finishing_first");
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieTableMetaClient reload3 = HoodieTableMetaClient.reload(reload2);
        Assertions.assertFalse(reload3.getActiveTimeline().containsInstant(hoodieInstant3.getTimestamp()));
        Assertions.assertEquals(1, reload3.getActiveTimeline().getRollbackTimeline().countInstants());
        Assertions.assertEquals(timestamp, ((HoodieInstantInfo) TimelineMetadataUtils.deserializeHoodieRollbackMetadata((byte[]) reload3.getActiveTimeline().readRollbackInfoAsBytes((HoodieInstant) reload3.getActiveTimeline().getRollbackTimeline().firstInstant().get()).get()).getInstantsRollback().stream().findFirst().get()).getCommitTime());
    }

    @Test
    public void testIndexerWithWriterFinishingLast() throws IOException {
        HoodieMetadataConfig build = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).build();
        upsertToTable(build, "indexer_with_writer_finishing_first");
        upsertToTable(build, "indexer_with_writer_finishing_first");
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.getActiveTimeline().lastInstant().get();
        String timestamp = hoodieInstant.getTimestamp();
        this.metaClient.getActiveTimeline().revertToInflight(hoodieInstant);
        HoodieTableMetaClient metadataMetaClient = new HoodieBackedTableMetadata(context(), this.metaClient.getStorage(), build, this.metaClient.getBasePath().toString()).getMetadataMetaClient();
        metadataMetaClient.getActiveTimeline().revertToInflight((HoodieInstant) metadataMetaClient.getActiveTimeline().filter(hoodieInstant2 -> {
            return hoodieInstant2.getTimestamp().equals(timestamp);
        }).getInstants().get(0));
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        String path = ((URL) Objects.requireNonNull(getClass().getClassLoader().getResource("streamer-config/indexer.properties"))).getPath();
        config.basePath = basePath();
        config.tableName = "indexer_with_writer_finishing_first";
        config.indexTypes = MetadataPartitionType.COLUMN_STATS.name();
        config.runningMode = "scheduleandexecute";
        config.propsFilePath = path;
        config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount);
        config.configs.add(HoodieMetadataConfig.METADATA_INDEX_CHECK_TIMEOUT_SECONDS + "=1");
        HoodieIndexer hoodieIndexer = new HoodieIndexer(jsc(), config);
        Throwable cause = ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            hoodieIndexer.start(0);
        })).getCause();
        Assertions.assertTrue(cause instanceof HoodieMetadataException);
        Assertions.assertTrue(cause.getMessage().contains("Failed to index partition"));
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(metadataMetaClient);
        Assertions.assertTrue(this.metaClient.getActiveTimeline().containsInstant(timestamp));
        Assertions.assertTrue(reload.getActiveTimeline().containsInstant(timestamp));
        Assertions.assertTrue(((HoodieInstant) this.metaClient.getActiveTimeline().filter(hoodieInstant3 -> {
            return hoodieInstant3.getTimestamp().equals(timestamp);
        }).getInstants().get(0)).isInflight());
        Assertions.assertTrue(((HoodieInstant) reload.getActiveTimeline().filter(hoodieInstant4 -> {
            return hoodieInstant4.getTimestamp().equals(timestamp);
        }).getInstants().get(0)).isInflight());
        Assertions.assertTrue(this.metaClient.getActiveTimeline().getRollbackTimeline().empty());
        Assertions.assertTrue(reload.getActiveTimeline().getRollbackTimeline().empty());
    }

    private static Stream<Arguments> colStatsFileGroupCountParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{1}), Arguments.of(new Object[]{2}), Arguments.of(new Object[]{4}), Arguments.of(new Object[]{8})});
    }

    @MethodSource({"colStatsFileGroupCountParams"})
    @ParameterizedTest
    public void testColStatsFileGroupCount(int i) {
        colStatsFileGroupCount = i;
        upsertToTable(getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true).build(), "indexer_test");
        Assertions.assertFalse(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        indexMetadataPartitionsAndAssert(MetadataPartitionType.FILES, Collections.emptyList(), Arrays.asList(MetadataPartitionType.COLUMN_STATS, MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer.properties");
        indexMetadataPartitionsAndAssert(MetadataPartitionType.COLUMN_STATS, Collections.singletonList(MetadataPartitionType.FILES), Arrays.asList(MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer.properties");
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.metaClient.getStorageConf().newInstance()).setBasePath(this.metaClient.getMetaPath() + "/metadata").build();
        Assertions.assertEquals(HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(build, HoodieTableMetadataUtil.getFileSystemView(build), MetadataPartitionType.COLUMN_STATS.getPartitionPath()).size(), i);
    }

    @Test
    public void testIndexerForExceptionWithNonFilesPartition() {
        upsertToTable(getMetadataConfigBuilder(false, false).build(), "indexer_test");
        Assertions.assertFalse(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        String path = ((URL) Objects.requireNonNull(getClass().getClassLoader().getResource("streamer-config/indexer.properties"))).getPath();
        config.basePath = basePath();
        config.tableName = "indexer_test";
        config.indexTypes = MetadataPartitionType.COLUMN_STATS.name();
        config.runningMode = "scheduleandexecute";
        config.propsFilePath = path;
        HoodieIndexer hoodieIndexer = new HoodieIndexer(jsc(), config);
        Throwable cause = ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            hoodieIndexer.start(0);
        })).getCause();
        Assertions.assertTrue(cause instanceof HoodieException);
        Assertions.assertTrue(cause.getMessage().contains("Metadata table is not yet initialized"));
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        Assertions.assertFalse(this.metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
        Assertions.assertFalse(this.metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
        Assertions.assertFalse(this.metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
        Assertions.assertFalse(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.FILES.getPartitionPath()));
        indexMetadataPartitionsAndAssert(MetadataPartitionType.FILES, Collections.emptyList(), Arrays.asList(MetadataPartitionType.COLUMN_STATS, MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer.properties");
        indexMetadataPartitionsAndAssert(MetadataPartitionType.COLUMN_STATS, Collections.singletonList(MetadataPartitionType.FILES), Arrays.asList(MetadataPartitionType.BLOOM_FILTERS), "indexer_test", "streamer-config/indexer.properties");
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.metaClient.getStorageConf().newInstance()).setBasePath(this.metaClient.getMetaPath() + "/metadata").build();
        Assertions.assertEquals(HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(build, HoodieTableMetadataUtil.getFileSystemView(build), MetadataPartitionType.COLUMN_STATS.getPartitionPath()).size(), (Integer) HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue());
    }

    private void upsertToTable(HoodieMetadataConfig hoodieMetadataConfig, String str) {
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(context(), getWriteConfigBuilder(basePath(), str).withMetadataConfig(hoodieMetadataConfig).build());
        Throwable th = null;
        try {
            String createNewInstantTime = sparkRDDWriteClient.createNewInstantTime();
            sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc().parallelize(DATA_GENERATOR.generateInserts(createNewInstantTime, 100), 1), createNewInstantTime).collect());
            if (sparkRDDWriteClient != null) {
                if (0 == 0) {
                    sparkRDDWriteClient.close();
                    return;
                }
                try {
                    sparkRDDWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private void scheduleAndExecuteIndexing(MetadataPartitionType metadataPartitionType, String str, String str2) {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        String path = ((URL) Objects.requireNonNull(getClass().getClassLoader().getResource(str2))).getPath();
        config.basePath = basePath();
        config.tableName = str;
        config.indexTypes = metadataPartitionType.name();
        config.runningMode = "scheduleandexecute";
        config.propsFilePath = path;
        if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
            config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount);
        }
        Assertions.assertEquals(0, new HoodieIndexer(jsc(), config).start(0));
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
    }

    private void indexMetadataPartitionsAndAssert(MetadataPartitionType metadataPartitionType, List<MetadataPartitionType> list, List<MetadataPartitionType> list2, String str, String str2) {
        scheduleAndExecuteIndexing(metadataPartitionType, str, str2);
        Set metadataPartitions = this.metaClient.getTableConfig().getMetadataPartitions();
        Assertions.assertTrue(metadataPartitions.contains(metadataPartitionType.getPartitionPath()));
        list.forEach(metadataPartitionType2 -> {
            Assertions.assertTrue(metadataPartitions.contains(metadataPartitionType2.getPartitionPath()));
        });
        list2.forEach(metadataPartitionType3 -> {
            Assertions.assertFalse(metadataPartitions.contains(metadataPartitionType3.getPartitionPath()));
        });
        Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), metadataPartitionType.getPartitionPath()));
        list.forEach(metadataPartitionType4 -> {
            Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), metadataPartitionType4.getPartitionPath()));
        });
    }

    @Test
    public void testIndexerDropPartitionDeletesInstantFromTimeline() {
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(context(), getWriteConfigBuilder(basePath(), "indexer_test").withMetadataConfig(getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).build()).build());
        Throwable th = null;
        try {
            String createNewInstantTime = sparkRDDWriteClient.createNewInstantTime();
            sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc().parallelize(DATA_GENERATOR.generateInserts(createNewInstantTime, 100), 1), createNewInstantTime).collect());
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.FILES.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
            HoodieIndexer.Config config = new HoodieIndexer.Config();
            String path = ((URL) Objects.requireNonNull(getClass().getClassLoader().getResource("streamer-config/indexer.properties"))).getPath();
            config.basePath = basePath();
            config.tableName = "indexer_test";
            config.indexTypes = MetadataPartitionType.COLUMN_STATS.name();
            config.runningMode = "schedule";
            config.propsFilePath = path;
            Assertions.assertEquals(0, new HoodieIndexer(jsc(), config).start(0));
            Option lastInstant = this.metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
            Assertions.assertTrue(lastInstant.isPresent());
            Assertions.assertEquals(HoodieInstant.State.REQUESTED, ((HoodieInstant) lastInstant.get()).getState());
            config.runningMode = "dropindex";
            Assertions.assertEquals(0, new HoodieIndexer(jsc(), config).start(0));
            Assertions.assertFalse(this.metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant().isPresent());
            Assertions.assertFalse(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.FILES.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
            Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTwoIndexersOneCreateOneDropPartition() {
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(context(), getWriteConfigBuilder(basePath(), "indexer_test").withMetadataConfig(getMetadataConfigBuilder(true, false).build()).build());
        Throwable th = null;
        try {
            try {
                String createNewInstantTime = sparkRDDWriteClient.createNewInstantTime();
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc().parallelize(DATA_GENERATOR.generateInserts(createNewInstantTime, 100), 1), createNewInstantTime).collect());
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.FILES.getPartitionPath()));
                Assertions.assertEquals(0, new HoodieIndexer(jsc(), getHoodieIndexConfig(MetadataPartitionType.BLOOM_FILTERS.name(), "scheduleandexecute", "streamer-config/indexer-only-bloom.properties", "indexer_test")).start(0));
                Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                Option<HoodieInstant> lastInstant = this.metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant();
                Assertions.assertTrue(lastInstant.isPresent());
                Assertions.assertEquals(0, new HoodieIndexer(jsc(), getHoodieIndexConfig(MetadataPartitionType.COLUMN_STATS.name(), "schedule", "streamer-config/indexer.properties", "indexer_test")).start(0));
                Option lastInstant2 = this.metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
                Assertions.assertTrue(lastInstant2.isPresent());
                Assertions.assertEquals(HoodieInstant.State.REQUESTED, ((HoodieInstant) lastInstant2.get()).getState());
                dropIndexAndAssert(MetadataPartitionType.COLUMN_STATS, "streamer-config/indexer.properties", Option.empty(), "indexer_test");
                Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.FILES.getPartitionPath()));
                Assertions.assertTrue(HoodieTableMetaClient.reload(this.metaClient).getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                Assertions.assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                dropIndexAndAssert(MetadataPartitionType.BLOOM_FILTERS, "streamer-config/indexer-only-bloom.properties", lastInstant, "indexer_test");
            } finally {
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private void dropIndexAndAssert(MetadataPartitionType metadataPartitionType, String str, Option<HoodieInstant> option, String str2) {
        Assertions.assertEquals(0, new HoodieIndexer(jsc(), getHoodieIndexConfig(metadataPartitionType.name(), "dropindex", str, str2)).start(0));
        Assertions.assertFalse(this.metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant().isPresent());
        Assertions.assertFalse(HoodieTableMetadataUtil.metadataPartitionExists(basePath(), context(), metadataPartitionType.getPartitionPath()));
        if (option.isPresent()) {
            Assertions.assertEquals(option, this.metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant());
        }
    }

    private HoodieIndexer.Config getHoodieIndexConfig(String str, String str2, String str3, String str4) {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        String path = ((URL) Objects.requireNonNull(getClass().getClassLoader().getResource(str3))).getPath();
        config.basePath = basePath();
        config.tableName = str4;
        config.indexTypes = str;
        config.runningMode = str2;
        config.propsFilePath = path;
        return config;
    }

    private static HoodieWriteConfig.Builder getWriteConfigBuilder(String str, String str2) {
        return HoodieWriteConfig.newBuilder().withPath(str).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(str2);
    }

    private static HoodieMetadataConfig.Builder getMetadataConfigBuilder(boolean z, boolean z2) {
        return HoodieMetadataConfig.newBuilder().enable(z).withAsyncIndex(z2);
    }
}
