package org.apache.beam.runners.flink;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.powermock.reflect.Whitebox;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.class */
public class FlinkExecutionEnvironmentsTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Parameterized.Parameter
    public boolean useDataStreamForBatch;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest$TestOptions.class */
    public interface TestOptions extends PipelineOptions {
        String getKey1();

        void setKey1(String str);

        Boolean getKey2();

        void setKey2(Boolean bool);

        String getKey3();

        void setKey3(String str);
    }

    @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}")
    public static Collection<Object[]> useDataStreamForBatchJobValues() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    private FlinkPipelineOptions getDefaultPipelineOptions() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setUseDataStreamForBatch(Boolean.valueOf(this.useDataStreamForBatch));
        return defaults;
    }

    @Test
    public void shouldSetParallelismBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setParallelism(42);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetParallelismStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(42));
    }

    @Test
    public void shouldSetMaxParallelismStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setMaxParallelism(42);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getMaxParallelism(), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getMaxParallelism()), Is.is(42));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldInferParallelismFromEnvironmentStreaming() throws IOException {
        String extractFlinkConfig = extractFlinkConfig();
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions, Collections.emptyList(), extractFlinkConfig);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(23));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(23));
    }

    @Test
    public void shouldFallbackToDefaultParallelismBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:80");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void shouldFallbackToDefaultParallelismStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:80");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(1));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(1));
    }

    @Test
    public void useDefaultParallelismFromContextBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(LocalEnvironment.class));
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createBatchExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void useDefaultParallelismFromContextStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(LocalStreamEnvironment.class));
        MatcherAssert.assertThat(defaultPipelineOptions.getParallelism(), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
        MatcherAssert.assertThat(Integer.valueOf(createStreamExecutionEnvironment.getParallelism()), Is.is(Integer.valueOf(LocalStreamEnvironment.getDefaultLocalParallelism())));
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:1234");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        checkHostAndPort(createBatchExecutionEnvironment, "host", 1234);
    }

    @Test
    public void shouldParsePortForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:1234");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        checkHostAndPort(createStreamExecutionEnvironment, "host", 1234);
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host");
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createBatchExecutionEnvironment, Matchers.instanceOf(RemoteEnvironment.class));
        checkHostAndPort(createBatchExecutionEnvironment, "host", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        checkHostAndPort(createStreamExecutionEnvironment, "host", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        ExecutionEnvironment createBatchExecutionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
        defaultPipelineOptions.setFlinkMaster("[auto]");
        Assert.assertEquals(createBatchExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions).getClass());
    }

    @Test
    public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        defaultPipelineOptions.setFlinkMaster("[auto]");
        Assert.assertEquals(createStreamExecutionEnvironment.getClass(), FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions).getClass());
    }

    @Test
    public void shouldDetectMalformedPortBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions);
    }

    @Test
    public void shouldDetectMalformedPortStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:p0rt");
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Unparseable port number");
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
    }

    @Test
    public void shouldSupportIPv4Batch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("192.168.1.1:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "192.168.1.1", 1234);
        defaultPipelineOptions.setFlinkMaster("192.168.1.1");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "192.168.1.1", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv4Streaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("192.168.1.1:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "192.168.1.1", 1234);
        defaultPipelineOptions.setFlinkMaster("192.168.1.1");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "192.168.1.1", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv6Batch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        defaultPipelineOptions.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldSupportIPv6Streaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
        checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", 1234);
        defaultPipelineOptions.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
        checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions), "FE80:CD00:0000:0CDE:1257:0000:211E:729C", ((Integer) RestOptions.PORT.defaultValue()).intValue());
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostBatch() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            defaultPipelineOptions.setFlinkMaster(str);
            checkHostAndPort(FlinkExecutionEnvironments.createBatchExecutionEnvironment(defaultPipelineOptions), "host", 1234);
        }
    }

    @Test
    public void shouldRemoveHttpProtocolFromHostStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(FlinkRunner.class);
        for (String str : new String[]{"http://host:1234", " http://host:1234", "https://host:1234", " https://host:1234"}) {
            defaultPipelineOptions.setFlinkMaster(str);
            checkHostAndPort(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions), "host", 1234);
        }
    }

    private String extractFlinkConfig() throws IOException {
        InputStream resourceAsStream = getClass().getResourceAsStream("/flink-conf.yaml");
        File root = this.temporaryFolder.getRoot();
        Files.copy(resourceAsStream, new File(root, "flink-conf.yaml").toPath(), new CopyOption[0]);
        return root.getAbsolutePath();
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getShutdownSourcesAfterIdleMs(), Is.is(0L));
    }

    @Test
    public void shouldAutoSetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setCheckpointingInterval(1000L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getShutdownSourcesAfterIdleMs(), Is.is(Long.MAX_VALUE));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setShutdownSourcesAfterIdleMs(42L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getShutdownSourcesAfterIdleMs(), Is.is(42L));
    }

    @Test
    public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setCheckpointingInterval(1000L);
        defaultPipelineOptions.setShutdownSourcesAfterIdleMs(42L);
        FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(defaultPipelineOptions.getShutdownSourcesAfterIdleMs(), Is.is(42L));
    }

    @Test
    public void shouldSetSavepointRestoreForRemoteStreaming() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setRunner(TestFlinkRunner.class);
        defaultPipelineOptions.setFlinkMaster("host:80");
        defaultPipelineOptions.setSavepointPath("fakePath");
        StreamExecutionEnvironment createStreamExecutionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        MatcherAssert.assertThat(createStreamExecutionEnvironment, Matchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat(getSavepointPath(createStreamExecutionEnvironment), Is.is("fakePath"));
    }

    @Test
    public void shouldFailOnUnknownStateBackend() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setStreaming(true);
        defaultPipelineOptions.setStateBackend("unknown");
        defaultPipelineOptions.setStateBackendStoragePath("/path");
        Assert.assertThrows("State backend was set to 'unknown' but no storage path was provided.", IllegalArgumentException.class, () -> {
            FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        });
    }

    @Test
    public void shouldFailOnNoStoragePathProvided() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setStreaming(true);
        defaultPipelineOptions.setStateBackend("unknown");
        Assert.assertThrows("State backend was set to 'unknown' but no storage path was provided.", IllegalArgumentException.class, () -> {
            FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions);
        });
    }

    @Test
    public void shouldCreateFileSystemStateBackend() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setStreaming(true);
        defaultPipelineOptions.setStateBackend("fileSystem");
        defaultPipelineOptions.setStateBackendStoragePath(this.temporaryFolder.getRoot().toURI().toString());
        MatcherAssert.assertThat(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions).getStateBackend(), Matchers.instanceOf(FsStateBackend.class));
    }

    @Test
    public void shouldCreateRocksDbStateBackend() {
        FlinkPipelineOptions defaultPipelineOptions = getDefaultPipelineOptions();
        defaultPipelineOptions.setStreaming(true);
        defaultPipelineOptions.setStateBackend("rocksDB");
        defaultPipelineOptions.setStateBackendStoragePath(this.temporaryFolder.getRoot().toURI().toString());
        MatcherAssert.assertThat(FlinkExecutionEnvironments.createStreamExecutionEnvironment(defaultPipelineOptions).getStateBackend(), Matchers.instanceOf(RocksDBStateBackend.class));
    }

    @Test
    public void shouldSetWebUIOptions() {
        PipelineOptionsFactory.register(TestOptions.class);
        PipelineOptionsFactory.register(FlinkPipelineOptions.class);
        Map map = FlinkExecutionEnvironments.createStreamExecutionEnvironment(PipelineOptionsFactory.fromArgs(new String[]{"--key1=value1", "--key2", "--key3=", "--parallelism=10", "--checkpointTimeoutMillis=500"}).as(FlinkPipelineOptions.class)).getConfig().getGlobalJobParameters().toMap();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "true");
        hashMap.put("key3", "");
        hashMap.put("checkpointTimeoutMillis", "500");
        hashMap.put("parallelism", "10");
        Assert.assertTrue(hashMap.size() == ((Map) hashMap.entrySet().stream().filter(entry -> {
            return map.containsKey(entry.getKey()) && ((String) entry.getValue()).equals(map.get(entry.getKey()));
        }).collect(Collectors.toMap(entry2 -> {
            return (String) entry2.getKey();
        }, entry3 -> {
            return (String) entry3.getValue();
        }))).size());
    }

    private void checkHostAndPort(Object obj, String str, int i) {
        MatcherAssert.assertThat(new InetSocketAddress(((Configuration) Whitebox.getInternalState(obj, "configuration")).getString(RestOptions.ADDRESS), ((Configuration) Whitebox.getInternalState(obj, "configuration")).getInteger(RestOptions.PORT)), Is.is(new InetSocketAddress(str, i)));
    }

    private String getSavepointPath(Object obj) {
        return ((Configuration) Whitebox.getInternalState(obj, "configuration")).getString("execution.savepoint.path", (String) null);
    }
}
