package org.apache.seatunnel.e2e.common.container.flink;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.class */
public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
    protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList("jobmanager.rpc.address: jobmanager", "taskmanager.numberOfTaskSlots: 10", "parallelism.default: 4", "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
    protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
    protected GenericContainer<?> jobManager;
    protected GenericContainer<?> taskManager;

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getDockerImage() {
        return DEFAULT_DOCKER_IMAGE;
    }

    @Override // org.apache.seatunnel.e2e.common.TestResource
    public void startUp() throws Exception {
        String dockerImage = getDockerImage();
        String join = String.join("\n", getFlinkProperties());
        this.jobManager = new GenericContainer(dockerImage).withCommand("jobmanager").withNetwork(NETWORK).withNetworkAliases(new String[]{"jobmanager"}).withExposedPorts(new Integer[0]).withEnv("FLINK_PROPERTIES", join).withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(dockerImage + ":jobmanager"))).waitingFor(new LogMessageWaitStrategy().withRegEx(".*Starting the resource manager.*").withStartupTimeout(Duration.ofMinutes(2L)));
        copySeaTunnelStarterToContainer(this.jobManager);
        copySeaTunnelStarterLoggingToContainer(this.jobManager);
        this.taskManager = new GenericContainer(dockerImage).withCommand("taskmanager").withNetwork(NETWORK).withNetworkAliases(new String[]{"taskmanager"}).withEnv("FLINK_PROPERTIES", join).dependsOn(new Startable[]{this.jobManager}).withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(dockerImage + ":taskmanager"))).waitingFor(new LogMessageWaitStrategy().withRegEx(".*Successful registration at resource manager.*").withStartupTimeout(Duration.ofMinutes(2L)));
        Startables.deepStart(Stream.of(this.jobManager)).join();
        Startables.deepStart(Stream.of(this.taskManager)).join();
        executeExtraCommands(this.jobManager);
    }

    protected List<String> getFlinkProperties() {
        return DEFAULT_FLINK_PROPERTIES;
    }

    @Override // org.apache.seatunnel.e2e.common.TestResource
    public void tearDown() throws Exception {
        if (this.taskManager != null) {
            this.taskManager.stop();
        }
        if (this.jobManager != null) {
            this.jobManager.stop();
        }
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected List<String> getExtraStartShellCommands() {
        return Collections.emptyList();
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public void executeExtraCommands(ContainerExtendedFactory containerExtendedFactory) throws IOException, InterruptedException {
        containerExtendedFactory.extend(this.jobManager);
        containerExtendedFactory.extend(this.taskManager);
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public Container.ExecResult executeJob(String str) throws IOException, InterruptedException {
        return executeJob(this.jobManager, str);
    }
}
