package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriverTest.class */
public class FlinkJobServerDriverTest {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriverTest.class);

    @Test
    public void testConfigurationDefaults() {
        FlinkJobServerDriver.ServerConfiguration serverConfiguration = new FlinkJobServerDriver.ServerConfiguration();
        Assert.assertThat(serverConfiguration.host, Is.is("localhost"));
        Assert.assertThat(Integer.valueOf(serverConfiguration.port), Is.is(8099));
        Assert.assertThat(Integer.valueOf(serverConfiguration.artifactPort), Is.is(8098));
        Assert.assertThat(serverConfiguration.flinkMasterUrl, Is.is("[auto]"));
        Assert.assertThat(serverConfiguration.sdkWorkerParallelism, Is.is(1L));
        Assert.assertThat(Boolean.valueOf(serverConfiguration.cleanArtifactsPerJob), Is.is(false));
        Assert.assertThat(FlinkJobServerDriver.fromConfig(serverConfiguration), Is.is(CoreMatchers.not(CoreMatchers.nullValue())));
    }

    @Test
    public void testConfigurationFromArgs() {
        FlinkJobServerDriver fromParams = FlinkJobServerDriver.fromParams(new String[]{"--job-host=test", "--job-port", "42", "--artifact-port", "43", "--flink-master-url=jobmanager", "--sdk-worker-parallelism=4", "--clean-artifacts-per-job"});
        Assert.assertThat(fromParams.configuration.host, Is.is("test"));
        Assert.assertThat(Integer.valueOf(fromParams.configuration.port), Is.is(42));
        Assert.assertThat(Integer.valueOf(fromParams.configuration.artifactPort), Is.is(43));
        Assert.assertThat(fromParams.configuration.flinkMasterUrl, Is.is("jobmanager"));
        Assert.assertThat(fromParams.configuration.sdkWorkerParallelism, Is.is(4L));
        Assert.assertThat(Boolean.valueOf(fromParams.configuration.cleanArtifactsPerJob), Is.is(true));
    }

    @Test
    public void testConfigurationFromConfig() {
        FlinkJobServerDriver.ServerConfiguration serverConfiguration = new FlinkJobServerDriver.ServerConfiguration();
        Assert.assertThat(FlinkJobServerDriver.fromConfig(serverConfiguration).configuration, Is.is(serverConfiguration));
    }

    @Test(timeout = 30000)
    public void testJobServerDriver() throws Exception {
        FlinkJobServerDriver flinkJobServerDriver = null;
        Thread thread = null;
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream2 = new PrintStream(byteArrayOutputStream);
        try {
            try {
                System.setErr(printStream2);
                flinkJobServerDriver = FlinkJobServerDriver.fromParams(new String[]{"--job-port=0", "--artifact-port=0"});
                thread = new Thread((Runnable) flinkJobServerDriver);
                thread.start();
                boolean z = false;
                while (!z) {
                    printStream2.flush();
                    String byteArrayOutputStream2 = byteArrayOutputStream.toString(Charsets.UTF_8.name());
                    if (byteArrayOutputStream2.contains("JobService started on localhost:") && byteArrayOutputStream2.contains("ArtifactStagingService started on localhost:")) {
                        z = true;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                Assert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(true));
                if (flinkJobServerDriver != null) {
                    flinkJobServerDriver.stop();
                }
                if (thread != null) {
                    thread.interrupt();
                    thread.join();
                }
            } finally {
                System.setErr(printStream);
            }
        } catch (Throwable th) {
            System.setErr(printStream);
            if (flinkJobServerDriver != null) {
                flinkJobServerDriver.stop();
            }
            if (thread != null) {
                thread.interrupt();
                thread.join();
            }
            throw th;
        }
    }
}
