package org.apache.beam.runners.flink;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Collections;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.powermock.reflect.Whitebox;
import org.powermock.reflect.exceptions.FieldNotFoundException;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.class */
public class FlinkExecutionEnvironmentsTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void shouldSetParallelismBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setParallelism(42);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetParallelismStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetMaxParallelismStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setMaxParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getMaxParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getMaxParallelism()), Is.is(42));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentStreaming() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldFallbackToDefaultParallelismBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void shouldFallbackToDefaultParallelismStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void useDefaultParallelismFromContextBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(LocalEnvironment.class));
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void useDefaultParallelismFromContextStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(LocalStreamEnvironment.class));
        MatcherAssert.assertThat(defaults.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        checkHostAndPort(createBatchExecutionEnvironment, "host", 1234);
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host:1234");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        checkHostAndPort(createStreamExecutionEnvironment, "host", 1234);
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        checkHostAndPort(createBatchExecutionEnvironment, "host", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        checkHostAndPort(createStreamExecutionEnvironment, "host", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
        defaults.setFlinkMaster("[auto]");
        Assert.assertEquals(createBatchExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()).getClass());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        defaults.setFlinkMaster("[auto]");
        Assert.assertEquals(createStreamExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList()).getClass());
    }

    @Test
    public void shouldDetectMalformedPortBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList());
    }

    @Test
    public void shouldDetectMalformedPortStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
    }

    @Test
    public void shouldSupportIPv4Batch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("192.168.1.1:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "192.168.1.1", 1234);
        defaults.setFlinkMaster("192.168.1.1");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "192.168.1.1", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv4Streaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("192.168.1.1:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "192.168.1.1", 1234);
        defaults.setFlinkMaster("192.168.1.1");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "192.168.1.1", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv6Batch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        defaults.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv6Streaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList()), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        defaults.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList()), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostBatch() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            defaults.setFlinkMaster(str);
            checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaults, Collections.emptyList()), "host", 1234);
        }
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            defaults.setFlinkMaster(str);
            checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList()), "host", 1234);
        }
    }

    private String extractFlinkConfig() throws IOException {
        InputStream resourceAsStream = getClass().getResourceAsStream("/flink-conf.yaml");
        File root = this.temporaryFolder.getRoot();
        Files.copy(resourceAsStream, new File(root, "flink-conf.yaml").toPath(), new CopyOption[0]);
        return root.getAbsolutePath();
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getShutdownSourcesAfterIdleMs(), Is.is(0L));
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setCheckpointingInterval(1000L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getShutdownSourcesAfterIdleMs(), Is.is(Long.MAX_VALUE));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setShutdownSourcesAfterIdleMs(42L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getShutdownSourcesAfterIdleMs(), Is.is(42L));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setCheckpointingInterval(1000L);
        defaults.setShutdownSourcesAfterIdleMs(42L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(defaults.getShutdownSourcesAfterIdleMs(), Is.is(42L));
    }

    @Test
    public void shouldSetSavepointRestoreForRemoteStreaming() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(TestFlinkRunner.class);
        defaults.setFlinkMaster("host:80");
        defaults.setSavepointPath("fakePath");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaults, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat(getSavepointPath(createStreamExecutionEnvironment), Is.is("fakePath"));
    }

    private void checkHostAndPort(Object obj, String str, int i) {
        try {
            MatcherAssert.assertThat((String) Whitebox.getInternalState(obj, "host"), Is.is(str));
            MatcherAssert.assertThat((Integer) Whitebox.getInternalState(obj, "port"), Is.is(Integer.valueOf(i)));
        } catch (FieldNotFoundException e) {
            MatcherAssert.assertThat(new InetSocketAddress(((Configuration) Whitebox.getInternalState(obj, "configuration")).getString(RestOptions.ADDRESS), ((Configuration) Whitebox.getInternalState(obj, "configuration")).getInteger(RestOptions.PORT)), Is.is(new InetSocketAddress(str, i)));
        }
    }

    private String getSavepointPath(Object obj) {
        try {
            return ((SavepointRestoreSettings) Whitebox.getInternalState(obj, "savepointRestoreSettings")).getRestorePath();
        } catch (FieldNotFoundException e) {
            return ((Configuration) Whitebox.getInternalState(obj, "configuration")).getString("execution.savepoint.path", (String) null);
        }
    }
}
