/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkJobServerDriverTest {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriverTest.class);

    @Test
    public void testConfigurationDefaults() {
        FlinkJobServerDriver.FlinkServerConfiguration config = new FlinkJobServerDriver.FlinkServerConfiguration();
        MatcherAssert.assertThat((Object)config.getHost(), (Matcher)Is.is((Object)"localhost"));
        MatcherAssert.assertThat((Object)config.getPort(), (Matcher)Is.is((Object)8099));
        MatcherAssert.assertThat((Object)config.getArtifactPort(), (Matcher)Is.is((Object)8098));
        MatcherAssert.assertThat((Object)config.getExpansionPort(), (Matcher)Is.is((Object)8097));
        MatcherAssert.assertThat((Object)config.getFlinkMaster(), (Matcher)Is.is((Object)"[auto]"));
        MatcherAssert.assertThat((Object)config.isCleanArtifactsPerJob(), (Matcher)Is.is((Object)true));
        FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig((FlinkJobServerDriver.FlinkServerConfiguration)config);
        MatcherAssert.assertThat((Object)flinkJobServerDriver, (Matcher)Is.is((Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue())));
    }

    @Test
    public void testConfigurationFromArgs() {
        FlinkJobServerDriver.FlinkServerConfiguration config = FlinkJobServerDriver.parseArgs((String[])new String[]{"--job-host=test", "--job-port", "42", "--artifact-port", "43", "--expansion-port", "44", "--flink-master=jobmanager", "--clean-artifacts-per-job=false"});
        MatcherAssert.assertThat((Object)config.getHost(), (Matcher)Is.is((Object)"test"));
        MatcherAssert.assertThat((Object)config.getPort(), (Matcher)Is.is((Object)42));
        MatcherAssert.assertThat((Object)config.getArtifactPort(), (Matcher)Is.is((Object)43));
        MatcherAssert.assertThat((Object)config.getExpansionPort(), (Matcher)Is.is((Object)44));
        MatcherAssert.assertThat((Object)config.getFlinkMaster(), (Matcher)Is.is((Object)"jobmanager"));
        MatcherAssert.assertThat((Object)config.isCleanArtifactsPerJob(), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testLegacyMasterUrlParameter() {
        FlinkJobServerDriver.FlinkServerConfiguration config = FlinkJobServerDriver.parseArgs((String[])new String[]{"--flink-master-url=jobmanager"});
        MatcherAssert.assertThat((Object)config.getFlinkMaster(), (Matcher)Is.is((Object)"jobmanager"));
    }

    @Test
    public void testConfigurationFromConfig() {
        FlinkJobServerDriver.FlinkServerConfiguration config = new FlinkJobServerDriver.FlinkServerConfiguration();
        FlinkJobServerDriver driver = FlinkJobServerDriver.fromConfig((FlinkJobServerDriver.FlinkServerConfiguration)config);
        MatcherAssert.assertThat((Object)driver.configuration, (Matcher)Is.is((Object)config));
    }

    @Test(timeout=30000L)
    public void testJobServerDriver() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkJobServerDriver driver = null;
        Thread driverThread = null;
        PrintStream oldErr = System.err;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream newErr = new PrintStream(baos);
        try {
            System.setErr(newErr);
            driver = FlinkJobServerDriver.fromParams((String[])new String[]{"--job-port=0", "--artifact-port=0", "--expansion-port=0"});
            driverThread = new Thread((Runnable)driver);
            driverThread.start();
            boolean success = false;
            while (!success) {
                newErr.flush();
                String output = baos.toString(Charsets.UTF_8.name());
                if (output.contains("JobService started on localhost:") && output.contains("ArtifactStagingService started on localhost:") && output.contains("ExpansionService started on localhost:")) {
                    success = true;
                    continue;
                }
                Thread.sleep(100L);
            }
            MatcherAssert.assertThat((Object)driver.getJobServerUrl(), (Matcher)Is.is((Matcher)CoreMatchers.not((Matcher)CoreMatchers.nullValue())));
            MatcherAssert.assertThat((Object)baos.toString(Charsets.UTF_8.name()), (Matcher)CoreMatchers.containsString((String)driver.getJobServerUrl()));
            MatcherAssert.assertThat((Object)driverThread.isAlive(), (Matcher)Is.is((Object)true));
        }
        catch (Throwable t) {
            System.setErr(oldErr);
            throw t;
        }
        finally {
            System.setErr(oldErr);
            if (driver != null) {
                driver.stop();
            }
            if (driverThread != null) {
                driverThread.interrupt();
                driverThread.join();
            }
        }
    }
}

