package org.apache.beam.runners.flink;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Collections;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.powermock.reflect.Whitebox;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.class */
public class FlinkExecutionEnvironmentsTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void shouldSetParallelismBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setParallelism(42);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(as.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetParallelismStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(as.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetMaxParallelismStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setMaxParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(as.getMaxParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getMaxParallelism()), Is.is(42));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(as.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentStreaming() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(as.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldFallbackToDefaultParallelismBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(as.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void shouldFallbackToDefaultParallelismStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(as.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void useDefaultParallelismFromContextBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(LocalEnvironment.class));
        MatcherAssert.assertThat(as.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void useDefaultParallelismFromContextStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(LocalStreamEnvironment.class));
        MatcherAssert.assertThat(as.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("host"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is(1234));
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host:1234");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createStreamExecutionEnvironment, "host"), Is.is("host"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createStreamExecutionEnvironment, "port"), Is.is(1234));
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("host"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createStreamExecutionEnvironment, "host"), Is.is("host"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createStreamExecutionEnvironment, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        as.setFlinkMaster("[auto]");
        Assert.assertEquals(createBatchExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList()).getClass());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        as.setFlinkMaster("[auto]");
        Assert.assertEquals(createStreamExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList()).getClass());
    }

    @Test
    public void shouldDetectMalformedPortBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
    }

    @Test
    public void shouldDetectMalformedPortStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
    }

    @Test
    public void shouldSupportIPv4Batch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("192.168.1.1:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("192.168.1.1"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is(1234));
        as.setFlinkMaster("192.168.1.1");
        ExecutionEnvironment createBatchExecutionEnvironment2 = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment2, "host"), Is.is("192.168.1.1"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment2, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldSupportIPv4Streaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("192.168.1.1:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("192.168.1.1"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is(1234));
        as.setFlinkMaster("192.168.1.1");
        ExecutionEnvironment createBatchExecutionEnvironment2 = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment2, "host"), Is.is("192.168.1.1"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment2, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldSupportIPv6Batch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is(1234));
        as.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        ExecutionEnvironment createBatchExecutionEnvironment2 = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment2, "host"), Is.is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment2, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldSupportIPv6Streaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createStreamExecutionEnvironment, "host"), Is.is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createStreamExecutionEnvironment, "port"), Is.is(1234));
        as.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        StreamExecutionEnvironment createStreamExecutionEnvironment2 = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat((String) Whitebox.getInternalState(createStreamExecutionEnvironment2, "host"), Is.is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
        MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createStreamExecutionEnvironment2, "port"), Is.is((Integer) RestOptions.PORT.defaultValue()));
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostBatch() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            as.setFlinkMaster(str);
            ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(as, Collections.emptyList());
            MatcherAssert.assertThat((String) Whitebox.getInternalState(createBatchExecutionEnvironment, "host"), Is.is("host"));
            MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createBatchExecutionEnvironment, "port"), Is.is(1234));
        }
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            as.setFlinkMaster(str);
            StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
            MatcherAssert.assertThat((String) Whitebox.getInternalState(createStreamExecutionEnvironment, "host"), Is.is("host"));
            MatcherAssert.assertThat((Integer) Whitebox.getInternalState(createStreamExecutionEnvironment, "port"), Is.is(1234));
        }
    }

    private String extractFlinkConfig() throws IOException {
        InputStream resourceAsStream = getClass().getResourceAsStream("/flink-conf.yaml");
        File root = this.temporaryFolder.getRoot();
        Files.copy(resourceAsStream, new File(root, "flink-conf.yaml").toPath(), new CopyOption[0]);
        return root.getAbsolutePath();
    }

    @Test
    public void shouldSetSavepointRestoreForRemoteStreaming() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("host:80");
        as.setSavepointPath("fakePath");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(as, Collections.emptyList());
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat((SavepointRestoreSettings) Whitebox.getInternalState(createStreamExecutionEnvironment, "restoreSettings"), Is.is(SavepointRestoreSettings.forPath("fakePath")));
    }
}
