package org.apache.seatunnel.engine.e2e;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;

@DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, EngineType.SPARK}, disabledReason = "only flink adjusts the parameter configuration rules")
/* loaded from: input_file:org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.class */
public class UnifyEnvParameterIT extends TestSuiteBase {
    private static final Logger log = LoggerFactory.getLogger(UnifyEnvParameterIT.class);

    @TestContainerExtension
    protected final ContainerExtendedFactory extendedFactory = genericContainer -> {
        Container.ExecResult execInContainer = genericContainer.execInContainer(new String[]{"bash", "-c", "mkdir -p /tmp/seatunnel && chown -R flink /tmp/seatunnel"});
        Assertions.assertEquals(0, execInContainer.getExitCode(), execInContainer.getStderr());
    };

    @TestTemplate
    public void testUnifiedParam(AbstractTestFlinkContainer abstractTestFlinkContainer) throws IOException, InterruptedException {
        genericTest("/unify-env-param-test-resource/unify_env_param_fakesource_to_localfile.conf", abstractTestFlinkContainer);
    }

    @TestTemplate
    public void testOutdatedParam(AbstractTestFlinkContainer abstractTestFlinkContainer) throws IOException, InterruptedException {
        genericTest("/unify-env-param-test-resource/outdated_env_param_fakesource_to_localfile.conf", abstractTestFlinkContainer);
    }

    @TestTemplate
    public void testUnifiedFlinkTableEnvParam(AbstractTestFlinkContainer abstractTestFlinkContainer) {
        CompletableFuture.supplyAsync(() -> {
            try {
                return abstractTestFlinkContainer.executeJob("/unify-env-param-test-resource/unify_flink_table_env_param_fakesource_to_console.conf");
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(300000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            List list = (List) JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand("curl http://localhost:8081/jobs/overview"), String.class, Object.class).get("jobs");
            if (!CollectionUtils.isEmpty(list)) {
                atomicReference.set(((Map) list.get(0)).get("jid").toString());
            }
            Assertions.assertNotNull(atomicReference.get());
        });
        AtomicReference atomicReference2 = new AtomicReference();
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Map map = JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s", atomicReference.get())), String.class, Object.class);
            if (null != map && "RUNNING".equals(map.get("state"))) {
                atomicReference2.set(map);
            }
            Assertions.assertNotNull(atomicReference2.get());
        });
        boolean z = false;
        Iterator it = ((List) ((Map) ((Map) atomicReference2.get()).get("plan")).get("nodes")).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) ((Map) it.next()).get("parallelism")).intValue();
            if (!z && intValue == 2) {
                z = true;
            }
        }
        Assertions.assertTrue(z);
    }

    public void genericTest(String str, AbstractTestFlinkContainer abstractTestFlinkContainer) throws IOException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            try {
                return abstractTestFlinkContainer.executeJob(str);
            } catch (Exception e) {
                log.error("Commit task exception :" + e.getMessage());
                throw new RuntimeException(e);
            }
        });
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(300000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            List list = (List) JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand("curl http://localhost:8081/jobs/overview"), String.class, Object.class).get("jobs");
            if (!CollectionUtils.isEmpty(list)) {
                atomicReference.set(((Map) list.get(0)).get("jid").toString());
            }
            Assertions.assertNotNull(atomicReference.get());
        });
        AtomicReference atomicReference2 = new AtomicReference();
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Map map = JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s", atomicReference.get())), String.class, Object.class);
            if (null != map && "RUNNING".equals(map.get("state"))) {
                atomicReference2.set(map);
            }
            Assertions.assertNotNull(atomicReference2.get());
        });
        Map map = (Map) atomicReference2.get();
        Map map2 = (Map) JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s/config", atomicReference.get())), String.class, Object.class).get("execution-config");
        Map map3 = JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s/checkpoints/config", atomicReference.get())), String.class, Object.class);
        AtomicReference atomicReference3 = new AtomicReference();
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Map map4 = (Map) JsonUtils.toMap(abstractTestFlinkContainer.executeJobManagerInnerCommand(String.format("curl http://localhost:8081/jobs/%s/checkpoints", atomicReference.get())), String.class, Object.class).get("latest");
            if (null != map4) {
                atomicReference3.set((Map) map4.get("completed"));
                Assertions.assertNotNull(atomicReference3.get());
            }
        });
        Assertions.assertEquals(1, ((Integer) map2.get("job-parallelism")).intValue());
        Assertions.assertEquals(5, ((Integer) map.get("maxParallelism")).intValue());
        Assertions.assertEquals(10000, ((Integer) map3.get("interval")).intValue());
        Assertions.assertEquals("exactly_once", map3.get("mode").toString());
        Assertions.assertEquals(600000, ((Integer) map3.get("timeout")).intValue());
        Assertions.assertTrue(((Map) atomicReference3.get()).get("external_path").toString().startsWith("file:/tmp/seatunnel/flink/checkpoints"));
        Assertions.assertEquals(2, ((Integer) map3.get("max_concurrent")).intValue());
        Assertions.assertTrue(((Boolean) ((Map) map3.get("externalization")).get("delete_on_cancellation")).booleanValue());
        Assertions.assertEquals(100, ((Integer) map3.get("min_pause")).intValue());
        Assertions.assertEquals(5, ((Integer) map3.get("tolerable_failed_checkpoints")).intValue());
        String obj = map2.get("restart-strategy").toString();
        Assertions.assertTrue(obj.contains("fixed delay"));
        Assertions.assertTrue(obj.contains("2 restart attempts"));
        Assertions.assertTrue(obj.contains("fixed delay (1000 ms)"));
        Assertions.assertTrue(map3.get("state_backend").toString().contains("RocksDBStateBackend"));
    }
}
