/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestCompactionUtil {
    private HoodieFlinkTable<?> table;
    private HoodieTableMetaClient metaClient;
    private Configuration conf;
    @TempDir
    File tempFile;

    void beforeEach() throws IOException {
        this.beforeEach(Collections.emptyMap());
    }

    void beforeEach(Map<String, String> options) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        options.forEach((k, v) -> this.conf.setString(k, v));
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
        this.table = FlinkTables.createTable((Configuration)this.conf);
        this.metaClient = this.table.getMetaClient();
        if (this.conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
            FlinkHoodieBackedTableMetadataWriter.create((org.apache.hadoop.conf.Configuration)this.table.getHadoopConf(), (HoodieWriteConfig)this.table.getConfig(), (HoodieEngineContext)this.table.getContext(), (Option)Option.empty(), (Option)Option.empty());
        }
    }

    @Test
    void rollbackCompaction() throws Exception {
        this.beforeEach();
        List oriInstants = IntStream.range(0, 3).mapToObj(i -> this.generateCompactionPlan()).collect(Collectors.toList());
        List instants = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).getInstants().collect(Collectors.toList());
        MatcherAssert.assertThat((String)"all the instants should be in pending state", (Object)instants.size(), (Matcher)CoreMatchers.is((Object)3));
        CompactionUtil.rollbackCompaction(this.table);
        boolean allRolledBack = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
        Assertions.assertTrue((boolean)allRolledBack, (String)"all the instants should be rolled back");
        List actualInstants = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        MatcherAssert.assertThat(actualInstants, (Matcher)CoreMatchers.is(oriInstants));
    }

    @Test
    void rollbackEarliestCompaction() throws Exception {
        this.beforeEach();
        this.conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
        List oriInstants = IntStream.range(0, 3).mapToObj(i -> this.generateCompactionPlan()).collect(Collectors.toList());
        List instants = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).getInstants().collect(Collectors.toList());
        MatcherAssert.assertThat((String)"all the instants should be in pending state", (Object)instants.size(), (Matcher)CoreMatchers.is((Object)3));
        CompactionUtil.rollbackEarliestCompaction(this.table, (Configuration)this.conf);
        long requestedCnt = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count();
        MatcherAssert.assertThat((String)"Only the first instant expects to be rolled back", (Object)requestedCnt, (Matcher)CoreMatchers.is((Object)1L));
        String instantTime = ((HoodieInstant)this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant().get()).getTimestamp();
        MatcherAssert.assertThat((Object)instantTime, (Matcher)CoreMatchers.is(oriInstants.get(0)));
    }

    @Test
    void testScheduleCompaction() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "time_elapsed");
        options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
        this.beforeEach(options);
        TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient((Configuration)this.conf);
        CompactionUtil.scheduleCompaction((HoodieTableMetaClient)this.metaClient, (HoodieFlinkWriteClient)writeClient, (boolean)true, (boolean)true);
        Option pendingCompactionInstant = this.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
        Assertions.assertTrue((boolean)pendingCompactionInstant.isPresent(), (String)"A compaction plan expects to be scheduled");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, this.conf);
        TimeUnit.SECONDS.sleep(3L);
        writeClient.startCommit();
        CompactionUtil.scheduleCompaction((HoodieTableMetaClient)this.metaClient, (HoodieFlinkWriteClient)writeClient, (boolean)true, (boolean)false);
        int numCompactionCommits = this.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
        MatcherAssert.assertThat((String)"Two compaction plan expects to be scheduled", (Object)numCompactionCommits, (Matcher)CoreMatchers.is((Object)2));
    }

    private String generateCompactionPlan() {
        HoodieCompactionOperation operation = new HoodieCompactionOperation();
        HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), Integer.valueOf(1));
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", instantTime);
        try {
            this.metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, TimelineMetadataUtils.serializeCompactionPlan((HoodieCompactionPlan)plan));
            this.table.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Exception scheduling compaction", ioe);
        }
        this.metaClient.reloadActiveTimeline();
        return instantTime;
    }
}

