/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.e2e.common.container.flink;

import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

public abstract class AbstractTestFlinkContainer
extends AbstractTestContainer {
    private static final Logger log = LoggerFactory.getLogger(AbstractTestFlinkContainer.class);
    protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList("jobmanager.rpc.address: jobmanager", "taskmanager.numberOfTaskSlots: 10", "parallelism.default: 4", "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
    protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
    protected GenericContainer<?> jobManager;
    protected GenericContainer<?> taskManager;

    @Override
    protected String getDockerImage() {
        return DEFAULT_DOCKER_IMAGE;
    }

    @Override
    public void startUp() throws Exception {
        FileUtils.createNewDir((String)HOST_VOLUME_MOUNT_PATH);
        String dockerImage = this.getDockerImage();
        String properties = String.join((CharSequence)"\n", this.getFlinkProperties());
        this.jobManager = new GenericContainer(dockerImage).withCommand("jobmanager").withNetwork(NETWORK).withNetworkAliases(new String[]{"jobmanager"}).withExposedPorts(new Integer[0]).withEnv("FLINK_PROPERTIES", properties).withLogConsumer((Consumer)new Slf4jLogConsumer(DockerLoggerFactory.getLogger((String)(dockerImage + ":jobmanager")))).waitingFor(new LogMessageWaitStrategy().withRegEx(".*Starting the resource manager.*").withStartupTimeout(Duration.ofMinutes(2L))).withFileSystemBind(HOST_VOLUME_MOUNT_PATH, "/tmp/seatunnel_mnt", BindMode.READ_WRITE);
        this.copySeaTunnelStarterToContainer(this.jobManager);
        this.copySeaTunnelStarterLoggingToContainer(this.jobManager);
        this.jobManager.setPortBindings((List)Lists.newArrayList((Object[])new String[]{String.format("%s:%s", 8081, 8081)}));
        this.taskManager = new GenericContainer(dockerImage).withCommand("taskmanager").withNetwork(NETWORK).withNetworkAliases(new String[]{"taskmanager"}).withEnv("FLINK_PROPERTIES", properties).dependsOn(new Startable[]{this.jobManager}).withLogConsumer((Consumer)new Slf4jLogConsumer(DockerLoggerFactory.getLogger((String)(dockerImage + ":taskmanager")))).waitingFor(new LogMessageWaitStrategy().withRegEx(".*Successful registration at resource manager.*").withStartupTimeout(Duration.ofMinutes(2L))).withFileSystemBind(HOST_VOLUME_MOUNT_PATH, "/tmp/seatunnel_mnt", BindMode.READ_WRITE);
        Startables.deepStart(Stream.of(this.jobManager)).join();
        Startables.deepStart(Stream.of(this.taskManager)).join();
        this.executeExtraCommands(this.jobManager);
    }

    protected List<String> getFlinkProperties() {
        return DEFAULT_FLINK_PROPERTIES;
    }

    @Override
    public void tearDown() throws Exception {
        if (this.taskManager != null) {
            this.taskManager.execInContainer(new String[]{"rm", "-rf", "/tmp/seatunnel_mnt"});
            this.taskManager.stop();
        }
        if (this.jobManager != null) {
            this.jobManager.execInContainer(new String[]{"rm", "-rf", "/tmp/seatunnel_mnt"});
            this.jobManager.stop();
        }
        FileUtils.deleteFile((String)HOST_VOLUME_MOUNT_PATH);
    }

    @Override
    protected String getSavePointCommand() {
        throw new UnsupportedOperationException("Not implemented");
    }

    @Override
    protected String getCancelJobCommand() {
        throw new UnsupportedOperationException("Not implemented");
    }

    @Override
    protected String getRestoreCommand() {
        throw new UnsupportedOperationException("Not implemented");
    }

    @Override
    protected List<String> getExtraStartShellCommands() {
        return Collections.emptyList();
    }

    @Override
    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws IOException, InterruptedException {
        extendedFactory.extend(this.jobManager);
        extendedFactory.extend(this.taskManager);
    }

    @Override
    public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException {
        return this.executeJob(confFile, Collections.emptyList());
    }

    @Override
    public Container.ExecResult executeJob(String confFile, List<String> variables) throws IOException, InterruptedException {
        log.info("test in container: {}", (Object)this.identifier());
        return this.executeJob(this.jobManager, confFile, null, variables);
    }

    @Override
    public String getServerLogs() {
        return this.jobManager.getLogs() + "\n" + this.taskManager.getLogs();
    }

    public String executeJobManagerInnerCommand(String command) throws IOException, InterruptedException {
        return this.jobManager.execInContainer(new String[]{"bash", "-c", command}).getStdout();
    }

    @Override
    public void copyFileToContainer(String path, String targetPath) {
        ContainerUtil.copyFileIntoContainers(ContainerUtil.getResourcesFile(path).toPath(), targetPath, this.jobManager);
    }

    @Override
    public void copyAbsolutePathToContainer(String path, String targetPath) {
        ContainerUtil.copyFileIntoContainers(Paths.get(path, new String[0]), targetPath, this.jobManager);
    }
}

