package org.apache.hudi.table.action.compact;

import com.codahale.metrics.Counter;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/table/action/compact/TestHoodieCompactor.class */
public class TestHoodieCompactor extends HoodieSparkClientTestHarness {
    private HoodieTableMetaClient metaClient;

    @BeforeEach
    public void setUp() throws Exception {
        initSparkContexts();
        initPath();
        this.storage = HoodieStorageUtils.getStorage(this.basePath, this.storageConf);
        this.metaClient = HoodieTestUtils.init(this.storageConf, this.basePath, HoodieTableType.MERGE_ON_READ);
        initTestDataGenerator();
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    public HoodieWriteConfig getConfig() {
        return getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetricsConfig(getMetricsConfig()).build();
    }

    private static HoodieMetricsConfig getMetricsConfig() {
        return HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build();
    }

    private long getCompactionMetricCount(String str) {
        HoodieMetrics metrics = this.writeClient.getMetrics();
        String metricsName = metrics.getMetricsName("counter", str);
        SortedMap counters = metrics.getMetrics().getRegistry().getCounters();
        if (counters.containsKey(metricsName)) {
            return ((Counter) counters.get(metricsName)).getCount();
        }
        return 0L;
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).withInlineCompaction(false).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).orcMaxFileSize(1048576L).build()).withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1048576).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
    }

    @Test
    public void testCompactionOnCopyOnWriteFail() throws Exception {
        this.metaClient = HoodieTestUtils.init(this.storageConf, this.basePath, HoodieTableType.COPY_ON_WRITE);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfig());
        Throwable th = null;
        try {
            HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
            String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
            Assertions.assertThrows(HoodieNotSupportedException.class, () -> {
                create.scheduleCompaction(this.context, createNewInstantTime, Option.empty());
                create.compact(this.context, createNewInstantTime);
            });
            Assertions.assertEquals(0L, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
            Assertions.assertEquals(0L, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCompactionEmpty() {
        HoodieWriteConfig config = getConfig();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(config);
        Throwable th = null;
        try {
            try {
                String startCommit = hoodieWriteClient.startCommit();
                hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts(startCommit, 100), 1), startCommit).collect();
                Assertions.assertFalse(create.scheduleCompaction(this.context, HoodieActiveTimeline.createNewInstantTime(), Option.empty()).isPresent(), "If there is nothing to compact, result will be empty");
                Assertions.assertEquals(0L, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
                Assertions.assertEquals(0L, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testScheduleCompactionWithInflightInstant() {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfig());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("100", 100), 1), "100").collect();
                hoodieWriteClient.startCommitWithTime("102");
                this.metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, "deltacommit", "102"), Option.empty());
                hoodieWriteClient.scheduleCompactionAtInstant("101", Option.empty());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testWriteStatusContentsAfterCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetricsConfig(getMetricsConfig()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                List<HoodieRecord> generateInserts = this.dataGen.generateInserts("100", 1000);
                hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "100").collect();
                int i = 1;
                while (i < 5) {
                    updateRecords(build, String.format("10%s", Integer.valueOf(i)), generateInserts);
                    assertLogFilesNumEqualsTo(build, i);
                    i++;
                }
                verifyCompaction(compact(hoodieWriteClient, String.format("10%s", Integer.valueOf(i))));
                Assertions.assertEquals(1L, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
                Assertions.assertEquals(1L, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSpillingWhenCompaction() throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxMemoryMaxSize(1L, 1L).build()).withMetricsConfig(getMetricsConfig()).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("100");
                List<HoodieRecord> generateInserts = this.dataGen.generateInserts("100", 100);
                hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "100").collect();
                for (int i = 1; i < 5; i += 2) {
                    updateRecords(build, "10" + i, generateInserts);
                    assertLogFilesNumEqualsTo(build, 1);
                    verifyCompaction(compact(hoodieWriteClient, "10" + (i + 1)));
                    Assertions.assertEquals((i / 2) + 1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
                    Assertions.assertEquals((i / 2) + 1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private void updateRecords(HoodieWriteConfig hoodieWriteConfig, String str, List<HoodieRecord> list) throws IOException {
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, this.context);
        JavaRDD<HoodieRecord> tagLocation = tagLocation(new HoodieBloomIndex(hoodieWriteConfig, SparkHoodieBloomIndexHelper.getInstance()), this.jsc.parallelize(this.dataGen.generateUpdates(str, list), 1), create);
        this.writeClient.startCommitWithTime(str);
        this.writeClient.upsertPreppedRecords(tagLocation, str).collect();
        this.metaClient.reloadActiveTimeline();
    }

    private void assertLogFilesNumEqualsTo(HoodieWriteConfig hoodieWriteConfig, int i) {
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, this.context);
        for (String str : this.dataGen.getPartitionPaths()) {
            Iterator it = ((List) create.getSliceView().getLatestFileSlices(str).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                Assertions.assertEquals(i, ((FileSlice) it.next()).getLogFiles().count(), "There should be " + i + " log file written for every data file");
            }
        }
    }

    private HoodieData<WriteStatus> compact(SparkRDDWriteClient sparkRDDWriteClient, String str) {
        sparkRDDWriteClient.scheduleCompactionAtInstant(str, Option.empty());
        return HoodieListData.eager(((JavaRDD) sparkRDDWriteClient.compact(str).getWriteStatuses()).collect());
    }

    private void verifyCompaction(HoodieData<WriteStatus> hoodieData) {
        List collectAsList = hoodieData.collectAsList();
        for (String str : this.dataGen.getPartitionPaths()) {
            Assertions.assertTrue(collectAsList.stream().anyMatch(writeStatus -> {
                return writeStatus.getStat().getPartitionPath().contentEquals(str);
            }));
        }
        collectAsList.forEach(writeStatus2 -> {
            HoodieWriteStat.RuntimeStats runtimeStats = writeStatus2.getStat().getRuntimeStats();
            Assertions.assertNotNull(runtimeStats);
            Assertions.assertEquals(runtimeStats.getTotalCreateTime(), 0L);
            Assertions.assertTrue(runtimeStats.getTotalUpsertTime() > 0);
            Assertions.assertTrue(runtimeStats.getTotalScanTime() > 0);
        });
    }
}
