package org.apache.seatunnel.engine.e2e;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;

@DisabledOnJre(value = {JRE.JAVA_11}, disabledReason = "slf4j jar conflict, we should fix it later")
/* loaded from: input_file:org/apache/seatunnel/engine/e2e/CheckpointEnableIT.class */
public class CheckpointEnableIT extends TestSuiteBase {
    private static final Logger log = LoggerFactory.getLogger(CheckpointEnableIT.class);

    @TestTemplate
    @DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "depending on the engine, the logic for determining whether a checkpoint is enabled is different")
    public void testZetaBatchCheckpointEnable(TestContainer testContainer) throws IOException, InterruptedException {
        Container.ExecResult executeJob = testContainer.executeJob("/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf");
        Assertions.assertTrue(testContainer.getServerLogs().contains("checkpoint is disabled"));
        Assertions.assertEquals(0, executeJob.getExitCode());
        Assertions.assertEquals(0, testContainer.executeJob("/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf").getExitCode());
        Container.ExecResult executeJob2 = testContainer.executeJob("/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
        Assertions.assertTrue(testContainer.getServerLogs().contains("checkpoint is enabled"));
        Assertions.assertEquals(0, executeJob2.getExitCode());
        Assertions.assertEquals(0, testContainer.executeJob("/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf").getExitCode());
        Assertions.assertEquals(0, testContainer.executeJob("/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable_withtimeout.conf").getExitCode());
    }

    @TestTemplate
    @DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "depending on the engine, the logic for determining whether a checkpoint is enabled is different")
    public void testZetaStreamingCheckpointInterval(TestContainer testContainer) throws IOException, InterruptedException, ExecutionException {
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            try {
                return testContainer.executeJob("/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Matcher matcher = Pattern.compile(".*Init JobMaster for Job stream_fakesource_to_localfile_interval.conf \\(([0-9]*)\\).*", 32).matcher(testContainer.getServerLogs());
            if (matcher.matches()) {
                atomicReference.set(matcher.group(1));
            }
            Assertions.assertNotNull(atomicReference.get());
        });
        Thread.sleep(15000L);
        Assertions.assertTrue(testContainer.getServerLogs().contains("checkpoint is enabled"));
        Assertions.assertEquals(0, testContainer.savepointJob((String) atomicReference.get()).getExitCode());
        Assertions.assertEquals(0, ((Container.ExecResult) supplyAsync.get()).getExitCode());
        CompletableFuture.supplyAsync(() -> {
            try {
                return testContainer.restoreJob("/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", (String) atomicReference.get());
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference2 = new AtomicReference(false);
        Awaitility.await().atMost(300000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Container.ExecResult executeJob = testContainer.executeJob("/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
            atomicReference2.set(Boolean.valueOf(0 == executeJob.getExitCode()));
            Assertions.assertEquals(0, executeJob.getExitCode());
        });
        Assertions.assertTrue(((Boolean) atomicReference2.get()).booleanValue());
    }

    @TestTemplate
    @DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "depending on the engine, the logic for determining whether a checkpoint is enabled is different")
    public void testZetaStreamingCheckpointNoInterval(TestContainer testContainer) throws IOException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            try {
                return testContainer.executeJob("/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf");
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Matcher matcher = Pattern.compile(".*Init JobMaster for Job stream_fakesource_to_localfile.conf \\(([0-9]*)\\).*", 32).matcher(testContainer.getServerLogs());
            if (matcher.matches()) {
                atomicReference.set(matcher.group(1));
            }
            Assertions.assertNotNull(atomicReference.get());
        });
        Thread.sleep(15000L);
        Assertions.assertTrue(testContainer.getServerLogs().contains("checkpoint is enabled"));
        Assertions.assertEquals(0, testContainer.savepointJob((String) atomicReference.get()).getExitCode());
        CompletableFuture.supplyAsync(() -> {
            try {
                return Integer.valueOf(testContainer.restoreJob("/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", (String) atomicReference.get()).getExitCode());
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference2 = new AtomicReference(false);
        Awaitility.await().atMost(300000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Container.ExecResult executeJob = testContainer.executeJob("/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
            atomicReference2.set(Boolean.valueOf(0 == executeJob.getExitCode()));
            Assertions.assertEquals(0, executeJob.getExitCode());
        });
        Assertions.assertTrue(((Boolean) atomicReference2.get()).booleanValue());
    }

    @TestTemplate
    @DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, EngineType.SPARK}, disabledReason = "depending on the engine, the logic for determining whether a checkpoint is enabled is different")
    public void testFlinkCheckpointEnable(AbstractTestFlinkContainer abstractTestFlinkContainer) throws IOException, InterruptedException {
        Container.ExecResult executeJob = abstractTestFlinkContainer.executeJob("/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
        Matcher matcher = Pattern.compile("JobID\\s([a-fA-F0-9]+)").matcher(executeJob.getStdout());
        Assertions.assertTrue(matcher.find());
        Assertions.assertEquals(Long.MAX_VALUE, JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s/checkpoints/config", matcher.group(1))), String.class, Object.class).getOrDefault("interval", 0L));
        Assertions.assertEquals(0, executeJob.getExitCode());
    }

    @TestTemplate
    @DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, EngineType.FLINK}, disabledReason = "depending on the engine, the logic for determining whether a checkpoint is enabled is different")
    public void testSparkCheckpointEnable(TestContainer testContainer) throws IOException, InterruptedException {
        Container.ExecResult executeJob = testContainer.executeJob("/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
        Assertions.assertTrue(executeJob.getStderr().contains("Ignoring non-Spark config property: checkpoint.interval"));
        Assertions.assertEquals(0, executeJob.getExitCode());
    }
}
