package org.apache.seatunnel.engine.e2e;

import io.restassured.RestAssured;
import io.restassured.response.Response;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.class */
public class ClusterSeaTunnelContainer extends SeaTunnelContainer {
    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
    private GenericContainer<?> secondServer;
    private final Network NETWORK = Network.newNetwork();
    private static final String jobName = "test测试";
    private static final String paramJobName = "param_test测试";
    private static final String http = "http://";
    private static final String colon = ":";
    private static final String confFile = "/fakesource_to_console.conf";
    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
    private static final Path binPath = Paths.get("/tmp/seatunnel/", "bin", SERVER_SHELL);
    private static final Path config = Paths.get("/tmp/seatunnel/", "config");
    private static final Path hadoopJar = Paths.get("/tmp/seatunnel/", "lib/seatunnel-hadoop3-3.1.4-uber.jar");

    @Override // org.apache.seatunnel.engine.e2e.SeaTunnelContainer
    @BeforeEach
    public void startUp() throws Exception {
        this.server = createServer("server");
        this.secondServer = createServer("secondServer");
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            Response response = RestAssured.given().get(http + this.server.getHost() + colon + this.server.getFirstMappedPort() + "/hazelcast/rest/cluster", new Object[0]);
            response.then().statusCode(200);
            Assertions.assertEquals(2, response.jsonPath().getList("members").size());
        });
    }

    @Override // org.apache.seatunnel.engine.e2e.SeaTunnelContainer
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.secondServer != null) {
            this.secondServer.close();
        }
    }

    @Test
    public void testSubmitJob() {
        AtomicInteger atomicInteger = new AtomicInteger();
        Arrays.asList(this.server, this.secondServer).forEach(genericContainer -> {
            Response submitJob = atomicInteger.get() == 0 ? submitJob(genericContainer, "BATCH", jobName, paramJobName) : submitJob(genericContainer, "BATCH", jobName, null);
            if (atomicInteger.get() == 0) {
                submitJob.then().statusCode(200).body("jobName", Matchers.equalTo(paramJobName), new Object[0]);
            } else {
                submitJob.then().statusCode(200).body("jobName", Matchers.equalTo(jobName), new Object[0]);
            }
            String string = submitJob.getBody().jsonPath().getString("jobId");
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/finished-jobs/FINISHED", new Object[0]).then().statusCode(200).body("[" + atomicInteger.get() + "].jobName", Matchers.equalTo(atomicInteger.get() == 0 ? paramJobName : jobName), new Object[0]).body("[" + atomicInteger.get() + "].errorMsg", Matchers.equalTo((Object) null), new Object[0]).body("[" + atomicInteger.get() + "].jobDag.jobId", Matchers.equalTo(Long.valueOf(Long.parseLong(string))), new Object[0]).body("[" + atomicInteger.get() + "].metrics.SourceReceivedCount", Matchers.equalTo("100"), new Object[0]).body("[" + atomicInteger.get() + "].metrics.SinkWriteCount", Matchers.equalTo("100"), new Object[0]).body("[" + atomicInteger.get() + "].jobStatus", Matchers.equalTo("FINISHED"), new Object[0]);
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/finished-jobs", new Object[0]).then().statusCode(200).body("[" + atomicInteger.get() + "].jobName", Matchers.equalTo(atomicInteger.get() == 0 ? paramJobName : jobName), new Object[0]).body("[" + atomicInteger.get() + "].errorMsg", Matchers.equalTo((Object) null), new Object[0]).body("[" + atomicInteger.get() + "].jobDag.jobId", Matchers.equalTo(Long.valueOf(Long.parseLong(string))), new Object[0]).body("[" + atomicInteger.get() + "].metrics.SourceReceivedCount", Matchers.equalTo("100"), new Object[0]).body("[" + atomicInteger.get() + "].metrics.SinkWriteCount", Matchers.equalTo("100"), new Object[0]).body("[" + atomicInteger.get() + "].jobStatus", Matchers.equalTo("FINISHED"), new Object[0]);
            });
            atomicInteger.getAndIncrement();
        });
    }

    @Test
    public void testStartWithSavePointWithoutJobId() {
        Arrays.asList(this.server, this.secondServer).forEach(genericContainer -> {
            submitJob("BATCH", genericContainer, true, jobName, paramJobName).then().statusCode(400).body("message", Matchers.equalTo("Please provide jobId when start with save point."), new Object[0]);
        });
    }

    @Test
    public void testStopJob() {
        Arrays.asList(this.server, this.secondServer).forEach(genericContainer -> {
            String string = submitJob(genericContainer, "STREAMING", jobName, paramJobName).getBody().jsonPath().getString("jobId");
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/running-job/" + string, new Object[0]).then().statusCode(200).body("jobStatus", Matchers.equalTo("RUNNING"), new Object[0]);
            });
            RestAssured.given().body("{\"jobId\":" + string + ",\"isStopWithSavePoint\":true}").post(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/stop-job", new Object[0]).then().statusCode(200).body("jobId", Matchers.equalTo(string), new Object[0]);
            Awaitility.await().atMost(6L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/finished-jobs/SAVEPOINT_DONE", new Object[0]).then().statusCode(200).body("[0].jobId", Matchers.equalTo(string), new Object[0]);
            });
            String string2 = submitJob(genericContainer, "STREAMING", jobName, paramJobName).getBody().jsonPath().getString("jobId");
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/running-job/" + string2, new Object[0]).then().statusCode(200).body("jobStatus", Matchers.equalTo("RUNNING"), new Object[0]);
            });
            RestAssured.given().body("{\"jobId\":" + string2 + ",\"isStopWithSavePoint\":false}").post(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/stop-job", new Object[0]).then().statusCode(200).body("jobId", Matchers.equalTo(string2), new Object[0]);
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/finished-jobs/CANCELED", new Object[0]).then().statusCode(200).body("[0].jobId", Matchers.equalTo(string2), new Object[0]);
            });
        });
    }

    private Response submitJob(GenericContainer<?> genericContainer, String str, String str2, String str3) {
        return submitJob(str, genericContainer, false, str2, str3);
    }

    private Response submitJob(String str, GenericContainer<?> genericContainer, boolean z, String str2, String str3) {
        String str4 = "{\n    \"env\": {\n        \"job.name\": \"" + str2 + "\",\n        \"job.mode\": \"" + str + "\"\n    },\n    \"source\": [\n        {\n            \"plugin_name\": \"FakeSource\",\n            \"result_table_name\": \"fake\",\n            \"row.num\": 100,\n            \"schema\": {\n                \"fields\": {\n                    \"name\": \"string\",\n                    \"age\": \"int\",\n                    \"card\": \"int\"\n                }\n            }\n        }\n    ],\n    \"transform\": [\n    ],\n    \"sink\": [\n        {\n            \"plugin_name\": \"Console\",\n            \"source_table_name\": [\"fake\"]\n        }\n    ]\n}";
        String str5 = null;
        if (str3 != null) {
            str5 = "jobName=" + str3;
        }
        if (z) {
            str5 = str5 + "&isStartWithSavePoint=true";
        }
        return RestAssured.given().body(str4).header("Content-Type", "application/json; charset=utf-8", new Object[0]).post(str5 == null ? http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/submit-job" : http + genericContainer.getHost() + colon + genericContainer.getFirstMappedPort() + "/hazelcast/rest/maps/submit-job?" + str5, new Object[0]);
    }

    private GenericContainer<?> createServer(String str) throws IOException, InterruptedException {
        GenericContainer<?> waitingFor = new GenericContainer(getDockerImage()).withNetwork(this.NETWORK).withEnv("TZ", "UTC").withCommand(ContainerUtil.adaptPathForWin(binPath.toString())).withNetworkAliases(new String[]{str}).withExposedPorts(new Integer[0]).withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("seatunnel-engine:openjdk:8"))).waitingFor(Wait.forListeningPort());
        copySeaTunnelStarterToContainer(waitingFor);
        waitingFor.setExposedPorts(Collections.singletonList(5801));
        waitingFor.withCopyFileToContainer(MountableFile.forHostPath(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), config.toString());
        waitingFor.withCopyFileToContainer(MountableFile.forHostPath(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"), config.toString());
        waitingFor.withCopyFileToContainer(MountableFile.forHostPath(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), hadoopJar.toString());
        waitingFor.start();
        executeExtraCommands(waitingFor);
        ContainerUtil.copyConnectorJarToContainer(waitingFor, confFile, getConnectorModulePath(), getConnectorNamePrefix(), getConnectorType(), "/tmp/seatunnel/");
        return waitingFor;
    }
}
