package org.apache.hudi.utilities.deltastreamer;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

@Disabled("HUDI-6505")
/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.class */
public class TestHoodieDeltaStreamerDAGExecution extends HoodieDeltaStreamerTestBase {

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution$StageListener.class */
    private static class StageListener extends SparkListener {
        int triggerCount;
        private final String eventToTrack;

        private StageListener(String str) {
            this.triggerCount = 0;
            this.eventToTrack = str;
        }

        public void onStageCompleted(SparkListenerStageCompleted sparkListenerStageCompleted) {
            System.out.println("stage details: " + sparkListenerStageCompleted.stageInfo().details());
            if (sparkListenerStageCompleted.stageInfo().details().contains(this.eventToTrack)) {
                this.triggerCount++;
            }
        }
    }

    @ParameterizedTest
    @CsvSource({"upsert", "insert", "bulk_insert"})
    public void testWriteOperationDoesNotTriggerRepeatedDAG(String str) throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.client.BaseHoodieClient.finalizeWrite");
        sparkSession.sparkContext().addSparkListener(stageListener);
        runDeltaStreamer(WriteOperationType.fromValue(str), false, Option.empty());
        Assertions.assertEquals(1, stageListener.triggerCount);
    }

    @Test
    public void testClusteringDoesNotTriggerRepeatedDAG() throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering");
        sparkSession.sparkContext().addSparkListener(stageListener);
        runDeltaStreamer(WriteOperationType.UPSERT, false, Option.of(getTableServicesConfigs(100, "false", "true", "1", "", "")));
        Assertions.assertEquals(1, stageListener.triggerCount);
    }

    @Test
    public void testCompactionDoesNotTriggerRepeatedDAG() throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute");
        sparkSession.sparkContext().addSparkListener(stageListener);
        runDeltaStreamer(WriteOperationType.UPSERT, true, Option.of(Arrays.asList("hoodie.compact.inline.max.delta.commits=1", "hoodie.compact.inline=true")));
        Assertions.assertEquals(1, stageListener.triggerCount);
    }

    private void runDeltaStreamer(WriteOperationType writeOperationType, boolean z, Option<List<String>> option) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        HoodieTestDataGenerator prepareParquetDFSFiles = prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        prepareParquetDFSSource(true, true, "source.avsc", "source.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
        String str = basePath + "/runDeltaStreamer" + testNum;
        FileIOUtils.deleteDirectory(new File(str));
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, writeOperationType, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, true, 100000, false, null, HoodieTableType.MERGE_ON_READ.name(), "timestamp", null);
        option.ifPresent(list -> {
            makeConfig.configs.addAll(list);
        });
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(10, str, sqlContext);
        testNum++;
        if (z) {
            prepareParquetDFSUpdates(10, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null, prepareParquetDFSFiles, "001");
            new HoodieDeltaStreamer(makeConfig, jsc).sync();
        }
    }
}
