/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Collections;
import org.apache.beam.runners.flink.FlinkExecutionEnvironments;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.TestFlinkRunner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.powermock.reflect.Whitebox;
import org.powermock.reflect.exceptions.FieldNotFoundException;

public class FlinkExecutionEnvironmentsTest {
    @Rule
    public @UnknownKeyFor @NonNull @Initialized TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public @UnknownKeyFor @NonNull @Initialized ExpectedException expectedException = ExpectedException.none();

    @Test
    public void shouldSetParallelismBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setParallelism(Integer.valueOf(42));
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)42));
        MatcherAssert.assertThat((Object)bev.getParallelism(), (Matcher)Is.is((Object)42));
    }

    @Test
    public void shouldSetParallelismStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setParallelism(Integer.valueOf(42));
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)42));
        MatcherAssert.assertThat((Object)sev.getParallelism(), (Matcher)Is.is((Object)42));
    }

    @Test
    public void shouldSetMaxParallelismStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setMaxParallelism(Integer.valueOf(42));
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getMaxParallelism(), (Matcher)Is.is((Object)42));
        MatcherAssert.assertThat((Object)sev.getMaxParallelism(), (Matcher)Is.is((Object)42));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentBatch() throws @UnknownKeyFor @NonNull @Initialized IOException {
        String flinkConfDir = this.extractFlinkConfig();
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("host:80");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList(), (String)flinkConfDir);
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)23));
        MatcherAssert.assertThat((Object)bev.getParallelism(), (Matcher)Is.is((Object)23));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentStreaming() throws @UnknownKeyFor @NonNull @Initialized IOException {
        String confDir = this.extractFlinkConfig();
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("host:80");
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList(), (String)confDir);
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)23));
        MatcherAssert.assertThat((Object)sev.getParallelism(), (Matcher)Is.is((Object)23));
    }

    @Test
    public void shouldFallbackToDefaultParallelismBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("host:80");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)1));
        MatcherAssert.assertThat((Object)bev.getParallelism(), (Matcher)Is.is((Object)1));
    }

    @Test
    public void shouldFallbackToDefaultParallelismStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("host:80");
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)1));
        MatcherAssert.assertThat((Object)sev.getParallelism(), (Matcher)Is.is((Object)1));
    }

    @Test
    public void useDefaultParallelismFromContextBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)bev, (Matcher)Matchers.instanceOf(LocalEnvironment.class));
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)LocalStreamEnvironment.getDefaultLocalParallelism()));
        MatcherAssert.assertThat((Object)bev.getParallelism(), (Matcher)Is.is((Object)LocalStreamEnvironment.getDefaultLocalParallelism()));
    }

    @Test
    public void useDefaultParallelismFromContextStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)sev, (Matcher)Matchers.instanceOf(LocalStreamEnvironment.class));
        MatcherAssert.assertThat((Object)options.getParallelism(), (Matcher)Is.is((Object)LocalStreamEnvironment.getDefaultLocalParallelism()));
        MatcherAssert.assertThat((Object)sev.getParallelism(), (Matcher)Is.is((Object)LocalStreamEnvironment.getDefaultLocalParallelism()));
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host:1234");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)bev, (Matcher)Matchers.instanceOf(RemoteEnvironment.class));
        this.checkHostAndPort(bev, "host", 1234);
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host:1234");
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)sev, (Matcher)Matchers.instanceOf(RemoteStreamEnvironment.class));
        this.checkHostAndPort(sev, "host", 1234);
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)bev, (Matcher)Matchers.instanceOf(RemoteEnvironment.class));
        this.checkHostAndPort(bev, "host", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host");
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)sev, (Matcher)Matchers.instanceOf(RemoteStreamEnvironment.class));
        this.checkHostAndPort(sev, "host", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        options.setFlinkMaster("[auto]");
        ExecutionEnvironment sev2 = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        Assert.assertEquals(sev.getClass(), sev2.getClass());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        options.setFlinkMaster("[auto]");
        StreamExecutionEnvironment sev2 = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        Assert.assertEquals(sev.getClass(), sev2.getClass());
    }

    @Test
    public void shouldDetectMalformedPortBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
    }

    @Test
    public void shouldDetectMalformedPortStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
    }

    @Test
    public void shouldSupportIPv4Batch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("192.168.1.1:1234");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "192.168.1.1", 1234);
        options.setFlinkMaster("192.168.1.1");
        bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "192.168.1.1", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldSupportIPv4Streaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("192.168.1.1:1234");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "192.168.1.1", 1234);
        options.setFlinkMaster("192.168.1.1");
        bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "192.168.1.1", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldSupportIPv6Batch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(bev, "FE80:CD00:0000:0CDE:1257:0000:211E:729C", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldSupportIPv6Streaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(sev, "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        this.checkHostAndPort(sev, "FE80:CD00:0000:0CDE:1257:0000:211E:729C", (Integer)RestOptions.PORT.defaultValue());
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostBatch() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        for (String flinkMaster : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            options.setFlinkMaster(flinkMaster);
            ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
            this.checkHostAndPort(sev, "host", 1234);
        }
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostStreaming() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        for (String flinkMaster : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            options.setFlinkMaster(flinkMaster);
            StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
            this.checkHostAndPort(sev, "host", 1234);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized String extractFlinkConfig() throws @UnknownKeyFor @NonNull @Initialized IOException {
        InputStream inputStream = this.getClass().getResourceAsStream("/flink-conf.yaml");
        File root = this.temporaryFolder.getRoot();
        Files.copy(inputStream, new File(root, "flink-conf.yaml").toPath(), new CopyOption[0]);
        return root.getAbsolutePath();
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getShutdownSourcesAfterIdleMs(), (Matcher)Is.is((Object)0L));
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setCheckpointingInterval(Long.valueOf(1000L));
        FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getShutdownSourcesAfterIdleMs(), (Matcher)Is.is((Object)Long.MAX_VALUE));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setShutdownSourcesAfterIdleMs(Long.valueOf(42L));
        FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getShutdownSourcesAfterIdleMs(), (Matcher)Is.is((Object)42L));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setCheckpointingInterval(Long.valueOf(1000L));
        options.setShutdownSourcesAfterIdleMs(Long.valueOf(42L));
        FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)options.getShutdownSourcesAfterIdleMs(), (Matcher)Is.is((Object)42L));
    }

    @Test
    public void shouldSetSavepointRestoreForRemoteStreaming() {
        String path = "fakePath";
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("host:80");
        options.setSavepointPath(path);
        StreamExecutionEnvironment sev = FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions)options, Collections.emptyList());
        MatcherAssert.assertThat((Object)sev, (Matcher)Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat((Object)this.getSavepointPath(sev), (Matcher)Is.is((Object)path));
    }

    private void checkHostAndPort(@UnknownKeyFor @NonNull @Initialized Object env, @UnknownKeyFor @NonNull @Initialized String expectedHost, @UnknownKeyFor @NonNull @Initialized int expectedPort) {
        try {
            MatcherAssert.assertThat((Object)((String)Whitebox.getInternalState((Object)env, (String)"host")), (Matcher)Is.is((Object)expectedHost));
            MatcherAssert.assertThat((Object)((Integer)Whitebox.getInternalState((Object)env, (String)"port")), (Matcher)Is.is((Object)expectedPort));
        }
        catch (FieldNotFoundException t) {
            String host = ((Configuration)Whitebox.getInternalState((Object)env, (String)"configuration")).getString(RestOptions.ADDRESS);
            int port = ((Configuration)Whitebox.getInternalState((Object)env, (String)"configuration")).getInteger(RestOptions.PORT);
            MatcherAssert.assertThat((Object)new InetSocketAddress(host, port), (Matcher)Is.is((Object)new InetSocketAddress(expectedHost, expectedPort)));
        }
    }

    private @UnknownKeyFor @NonNull @Initialized String getSavepointPath(@UnknownKeyFor @NonNull @Initialized Object env) {
        try {
            return ((SavepointRestoreSettings)Whitebox.getInternalState((Object)env, (String)"savepointRestoreSettings")).getRestorePath();
        }
        catch (FieldNotFoundException t) {
            return ((Configuration)Whitebox.getInternalState((Object)env, (String)"configuration")).getString("execution.savepoint.path", null);
        }
    }
}

