package org.apache.hudi.table.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.class */
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
    private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}";
    private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;

    /* JADX WARN: Multi-variable type inference failed */
    public static Stream<Arguments> configParams() {
        return Arrays.stream(new Boolean[]{new Boolean[]{true}, new Boolean[]{false}}).map((v0) -> {
            return Arguments.of(v0);
        });
    }

    @Override // org.apache.hudi.testutils.HoodieClientTestBase
    @BeforeEach
    public void setUp() throws Exception {
        initPath();
        initSparkContexts();
        initFileSystem();
        initMetaClient(this.tableType);
        initTestDataGenerator();
    }

    @Override // org.apache.hudi.testutils.HoodieClientTestBase
    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    @Test
    public void testMarkerBasedRollbackAppend() throws Exception {
        HoodieTestTable of = HoodieTestTable.of(this.metaClient);
        of.forCommit("001").withMarkerFile("partA", (String) of.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA"), IOType.APPEND);
        Assertions.assertEquals(1, new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), this.context, this.metaClient), this.context, getConfig(), "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "001")).size());
    }

    @Test
    public void testCopyOnWriteRollbackWithTestTable() throws Exception {
        HoodieTestTable of = HoodieTestTable.of(this.metaClient);
        String str = (String) of.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA");
        of.forCommit("001").withMarkerFile("partA", str, IOType.MERGE).withMarkerFile("partB", (String) ((HoodieTestTable) of.addCommit("001").withBaseFilesInPartition("partA", new String[]{str}).getLeft()).getFileIdsWithBaseFilesInPartitions(new String[]{"partB"}).get("partB"), IOType.CREATE).withMarkerFile("partA", "f2", IOType.CREATE);
        HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
        List performRollback = new BaseRollbackHelper(create.getMetaClient(), getConfig()).performRollback(this.context, new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "001"), new MarkerBasedRollbackStrategy(create, this.context, getConfig(), "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "001")));
        Assertions.assertEquals(2, performRollback.size());
        FileStatus[] listAllFilesInPartition = of.listAllFilesInPartition("partA");
        Assertions.assertEquals(0, of.listAllFilesInPartition("partB").length);
        Assertions.assertEquals(1, listAllFilesInPartition.length);
        Assertions.assertEquals(2, performRollback.stream().mapToInt(hoodieRollbackStat -> {
            return hoodieRollbackStat.getSuccessDeleteFiles().size();
        }).sum());
        Assertions.assertEquals(1, performRollback.stream().mapToInt(hoodieRollbackStat2 -> {
            return hoodieRollbackStat2.getFailedDeleteFiles().size();
        }).sum());
    }

    @MethodSource({"configParams"})
    @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
    public void testCopyOnWriteRollback(boolean z) throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).build()).withPath(this.basePath).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), build);
        Throwable th = null;
        try {
            try {
                List<HoodieRollbackStat> testUpdateAndRollback = testUpdateAndRollback(z, build, sparkRDDWriteClient);
                Assertions.assertEquals(3, testUpdateAndRollback.size());
                for (HoodieRollbackStat hoodieRollbackStat : testUpdateAndRollback) {
                    Assertions.assertEquals(1, hoodieRollbackStat.getSuccessDeleteFiles().size());
                    Assertions.assertEquals(0, hoodieRollbackStat.getFailedDeleteFiles().size());
                    Assertions.assertEquals(0, hoodieRollbackStat.getCommandBlocksCount().size());
                }
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"configParams"})
    @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
    public void testMergeOnReadRollback(boolean z) throws Exception {
        tearDown();
        this.tableType = HoodieTableType.MERGE_ON_READ;
        setUp();
        HoodieWriteConfig build = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).build()).withPath(this.basePath).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), build);
        Throwable th = null;
        try {
            try {
                List<HoodieRollbackStat> testUpdateAndRollback = testUpdateAndRollback(z, build, sparkRDDWriteClient);
                Assertions.assertEquals(3, testUpdateAndRollback.size());
                for (HoodieRollbackStat hoodieRollbackStat : testUpdateAndRollback) {
                    Assertions.assertEquals(0, hoodieRollbackStat.getSuccessDeleteFiles().size());
                    Assertions.assertEquals(0, hoodieRollbackStat.getFailedDeleteFiles().size());
                    Assertions.assertEquals(1, hoodieRollbackStat.getCommandBlocksCount().size());
                    hoodieRollbackStat.getCommandBlocksCount().forEach((fileStatus, l) -> {
                        Assertions.assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
                    });
                }
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"configParams"})
    @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
    public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean z) throws Exception {
        tearDown();
        this.tableType = HoodieTableType.MERGE_ON_READ;
        setUp();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(z).build()).withPath(this.basePath).build());
        Throwable th = null;
        try {
            try {
                List<HoodieRollbackStat> testInsertAndRollback = testInsertAndRollback(sparkRDDWriteClient);
                Assertions.assertEquals(3, testInsertAndRollback.size());
                for (HoodieRollbackStat hoodieRollbackStat : testInsertAndRollback) {
                    Assertions.assertEquals(1, hoodieRollbackStat.getSuccessDeleteFiles().size());
                    Assertions.assertEquals(0, hoodieRollbackStat.getFailedDeleteFiles().size());
                    Assertions.assertEquals(0, hoodieRollbackStat.getCommandBlocksCount().size());
                    hoodieRollbackStat.getCommandBlocksCount().forEach((fileStatus, l) -> {
                        Assertions.assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
                    });
                }
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private List<HoodieRollbackStat> testInsertAndRollback(SparkRDDWriteClient sparkRDDWriteClient) {
        sparkRDDWriteClient.startCommitWithTime("001");
        JavaRDD insert = sparkRDDWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("001", 100), 1), "001");
        sparkRDDWriteClient.commit("001", insert);
        insert.collect();
        HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
        return new BaseRollbackHelper(create.getMetaClient(), getConfig()).performRollback(this.context, new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "001"), new MarkerBasedRollbackStrategy(create, this.context, getConfig(), "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "001")));
    }

    private List<HoodieRollbackStat> testUpdateAndRollback(boolean z, HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient) {
        sparkRDDWriteClient.startCommitWithTime("001");
        sparkRDDWriteClient.commit("001", sparkRDDWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("001", 100), 1), "001"));
        sparkRDDWriteClient.startCommitWithTime("002");
        sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("002", 50), 1), "002").collect();
        HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
        return new BaseRollbackHelper(create.getMetaClient(), getConfig()).performRollback(this.context, new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "002"), new MarkerBasedRollbackStrategy(create, this.context, getConfig(), "003").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "002")));
    }

    @Test
    public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception {
        HoodieTestTable of = HoodieTestTable.of(this.metaClient);
        of.forCommit("001").withMarkerFile("partA", (String) of.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA"), IOType.APPEND);
        HoodieSparkTable create = HoodieSparkTable.create(getConfig(), this.context, this.metaClient);
        DirectWriteMarkers directWriteMarkers = (DirectWriteMarkers) Mockito.mock(DirectWriteMarkers.class);
        MockitoAnnotations.initMocks(this);
        Mockito.when(directWriteMarkers.allMarkerFilePaths()).thenThrow(new Throwable[]{new IOException("Markers.type file not present")});
        Assertions.assertEquals(1, new MarkerBasedRollbackStrategy(create, this.context, getConfig(), "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "001")).size());
    }
}
