/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.flink.CreateStreamingFlinkView;
import org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.runners.flink.FlinkTransformOverrides;
import org.apache.beam.runners.flink.TestFlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Every;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.powermock.reflect.exceptions.FieldNotFoundException;

@RunWith(value=JUnit4.class)
public class FlinkPipelineExecutionEnvironmentTest
implements Serializable {
    @Rule
    public transient @UnknownKeyFor @NonNull @Initialized TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void shouldRecognizeAndTranslateStreamingPipeline() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("[auto]");
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline pipeline = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)pipeline.apply((PTransform)GenerateSequence.from((long)0L).withRate(1L, Duration.standardSeconds((long)1L)))).apply((PTransform)ParDo.of((DoFn)new DoFn<Long, String>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
                c.output((Object)Long.toString((Long)c.element()));
            }
        }))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardHours((long)1L))))).apply((PTransform)TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path"));
        flinkEnv.translate(pipeline);
    }

    @Test
    public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws @UnknownKeyFor @NonNull @Initialized IOException {
        FlinkPipelineOptions options = this.testPreparingResourcesToStage("localhost:8081", true, false);
        MatcherAssert.assertThat((Object)options.getFilesToStage().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((String)options.getFilesToStage().get(0)), (Matcher)RegexMatcher.matches((String)".*\\.jar"));
    }

    @Test
    public void shouldFailWhenFileDoesNotExistAndFlinkMasterIsSetExplicitly() {
        Assert.assertThrows((String)"To-be-staged file does not exist: ", IllegalStateException.class, () -> this.testPreparingResourcesToStage("localhost:8081", true, true));
    }

    @Test
    public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws @UnknownKeyFor @NonNull @Initialized IOException {
        FlinkPipelineOptions options = this.testPreparingResourcesToStage("[auto]");
        MatcherAssert.assertThat((Object)options.getFilesToStage().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)options.getFilesToStage(), (Matcher)Every.everyItem((Matcher)CoreMatchers.not((Matcher)RegexMatcher.matches((String)".*\\.jar"))));
    }

    @Test
    public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws @UnknownKeyFor @NonNull @Initialized IOException {
        FlinkPipelineOptions options = this.testPreparingResourcesToStage("[collection]");
        MatcherAssert.assertThat((Object)options.getFilesToStage().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)options.getFilesToStage(), (Matcher)Every.everyItem((Matcher)CoreMatchers.not((Matcher)RegexMatcher.matches((String)".*\\.jar"))));
    }

    @Test
    public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws @UnknownKeyFor @NonNull @Initialized IOException {
        FlinkPipelineOptions options = this.testPreparingResourcesToStage("[local]");
        MatcherAssert.assertThat((Object)options.getFilesToStage().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)options.getFilesToStage(), (Matcher)Every.everyItem((Matcher)CoreMatchers.not((Matcher)RegexMatcher.matches((String)".*\\.jar"))));
    }

    @Test
    public void shouldUseDefaultTempLocationIfNoneSet() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("clusterAddress");
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        flinkEnv.translate(pipeline);
        String defaultTmpDir = System.getProperty("java.io.tmpdir");
        MatcherAssert.assertThat((Object)options.getFilesToStage(), (Matcher)Matchers.hasItem((Matcher)CoreMatchers.startsWith((String)defaultTmpDir)));
    }

    @Test
    public void shouldUsePreparedFilesOnRemoteEnvironment() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("clusterAddress");
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        flinkEnv.translate(pipeline);
        ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment();
        MatcherAssert.assertThat((Object)executionEnvironment, (Matcher)CoreMatchers.instanceOf(RemoteEnvironment.class));
        List<URL> jarFiles = this.getJars(executionEnvironment);
        List<URL> urlConvertedStagedFiles = FlinkPipelineExecutionEnvironmentTest.convertFilesToURLs(options.getFilesToStage());
        MatcherAssert.assertThat(jarFiles, (Matcher)CoreMatchers.is(urlConvertedStagedFiles));
    }

    @Test
    public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster("clusterAddress");
        options.setStreaming(true);
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        flinkEnv.translate(pipeline);
        StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment();
        MatcherAssert.assertThat((Object)streamExecutionEnvironment, (Matcher)CoreMatchers.instanceOf(RemoteStreamEnvironment.class));
        List<URL> jarFiles = this.getJars(streamExecutionEnvironment);
        List<URL> urlConvertedStagedFiles = FlinkPipelineExecutionEnvironmentTest.convertFilesToURLs(options.getFilesToStage());
        MatcherAssert.assertThat(jarFiles, (Matcher)CoreMatchers.is(urlConvertedStagedFiles));
    }

    @Test
    public void shouldUseTransformOverrides() {
        boolean[] testParameters;
        for (boolean streaming : testParameters = new boolean[]{true, false}) {
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            options.setStreaming(streaming);
            options.setRunner(FlinkRunner.class);
            FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
            Pipeline p = (Pipeline)Mockito.spy((Object)Pipeline.create((PipelineOptions)options));
            flinkEnv.translate(p);
            ArgumentCaptor captor = ArgumentCaptor.forClass(ImmutableList.class);
            ((Pipeline)Mockito.verify((Object)p)).replaceAll((List)captor.capture());
            ImmutableList overridesList = (ImmutableList)captor.getValue();
            MatcherAssert.assertThat((Object)overridesList.isEmpty(), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)overridesList.size(), (Matcher)CoreMatchers.is((Object)FlinkTransformOverrides.getDefaultOverrides((FlinkPipelineOptions)options).size()));
        }
    }

    @Test
    public void shouldProvideParallelismToTransformOverrides() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setStreaming(true);
        options.setRunner(FlinkRunner.class);
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        ((PCollection)p.apply((PTransform)Create.of((Object)"test", (Object[])new String[0]))).apply((PTransform)TextIO.write().to("/tmp"));
        p = (Pipeline)Mockito.spy((Object)p);
        flinkEnv.translate(p);
        ArgumentCaptor captor = ArgumentCaptor.forClass(ImmutableList.class);
        ((Pipeline)Mockito.verify((Object)p)).replaceAll((List)captor.capture());
        ImmutableList overridesList = (ImmutableList)captor.getValue();
        MatcherAssert.assertThat((Object)overridesList, (Matcher)Matchers.hasItem((Matcher)new BaseMatcher<PTransformOverride>(){

            public void describeTo(@UnknownKeyFor @NonNull @Initialized Description description) {
            }

            public @UnknownKeyFor @NonNull @Initialized boolean matches(@UnknownKeyFor @NonNull @Initialized Object actual) {
                PTransformOverrideFactory overrideFactory;
                if (actual instanceof PTransformOverride && (overrideFactory = ((PTransformOverride)actual).getOverrideFactory()) instanceof FlinkStreamingPipelineTranslator.StreamingShardedWriteFactory) {
                    FlinkStreamingPipelineTranslator.StreamingShardedWriteFactory factory = (FlinkStreamingPipelineTranslator.StreamingShardedWriteFactory)overrideFactory;
                    return factory.options.getParallelism() > 0;
                }
                return false;
            }
        }));
    }

    @Test
    public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline p = (Pipeline)Mockito.spy((Object)Pipeline.create((PipelineOptions)options));
        p.apply((PTransform)GenerateSequence.from((long)0L));
        flinkEnv.translate(p);
        ArgumentCaptor captor = ArgumentCaptor.forClass(ImmutableList.class);
        ((Pipeline)Mockito.verify((Object)p)).replaceAll((List)captor.capture());
        ImmutableList overridesList = (ImmutableList)captor.getValue();
        MatcherAssert.assertThat((Object)overridesList, (Matcher)Matchers.hasItem((Object)PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo((String)"beam:transform:create_view:v1"), (PTransformOverrideFactory)CreateStreamingFlinkView.Factory.INSTANCE)));
    }

    @Test
    public void testTranslationModeOverrideWithUnboundedSources() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setStreaming(false);
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        pipeline.apply((PTransform)GenerateSequence.from((long)0L));
        flinkEnv.translate(pipeline);
        MatcherAssert.assertThat((Object)options.isStreaming(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testTranslationModeNoOverrideWithoutUnboundedSources() {
        boolean[] testArgs;
        for (boolean streaming : testArgs = new boolean[]{true, false}) {
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            options.setRunner(FlinkRunner.class);
            options.setStreaming(streaming);
            FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
            Pipeline pipeline = Pipeline.create((PipelineOptions)options);
            pipeline.apply((PTransform)GenerateSequence.from((long)0L).to(10L));
            flinkEnv.translate(pipeline);
            MatcherAssert.assertThat((Object)options.isStreaming(), (Matcher)Matchers.is((Object)streaming));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldLogWarningWhenCheckpointingIsDisabled() {
        Pipeline pipeline = Pipeline.create();
        pipeline.getOptions().setRunner(TestFlinkRunner.class);
        ((PCollection)pipeline.apply((PTransform)GenerateSequence.from((long)0L))).apply((PTransform)ParDo.of((DoFn)new DoFn<Long, Void>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext ctx) {
                throw new RuntimeException("Failing here is ok.");
            }
        }));
        PrintStream oldErr = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream replacementStdErr = new PrintStream(byteArrayOutputStream);
        try {
            System.setErr(replacementStdErr);
            pipeline.run();
            Assert.fail((String)"Should have failed");
        }
        catch (Exception exception) {
        }
        finally {
            System.setErr(oldErr);
        }
        replacementStdErr.flush();
        MatcherAssert.assertThat((Object)new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8), (Matcher)CoreMatchers.containsString((String)"UnboundedSources present which rely on checkpointing, but checkpointing is disabled."));
    }

    private @UnknownKeyFor @NonNull @Initialized FlinkPipelineOptions testPreparingResourcesToStage(@UnknownKeyFor @NonNull @Initialized String flinkMaster) throws @UnknownKeyFor @NonNull @Initialized IOException {
        return this.testPreparingResourcesToStage(flinkMaster, false, true);
    }

    private @UnknownKeyFor @NonNull @Initialized FlinkPipelineOptions testPreparingResourcesToStage(@UnknownKeyFor @NonNull @Initialized String flinkMaster, @UnknownKeyFor @NonNull @Initialized boolean includeIndividualFile, @UnknownKeyFor @NonNull @Initialized boolean includeNonExisting) throws @UnknownKeyFor @NonNull @Initialized IOException {
        Pipeline pipeline = Pipeline.create();
        String tempLocation = this.tmpFolder.newFolder().getAbsolutePath();
        ArrayList<String> filesToStage = new ArrayList<String>();
        File stagingDir = this.tmpFolder.newFolder();
        File stageFile = new File(stagingDir, "stage");
        stageFile.createNewFile();
        filesToStage.add(stagingDir.getAbsolutePath());
        if (includeIndividualFile) {
            String temporaryLocation = this.tmpFolder.newFolder().getAbsolutePath();
            ArrayList<String> filesToZip = new ArrayList<String>();
            filesToZip.add(stagingDir.getAbsolutePath());
            File individualStagingFile = new File((String)PipelineResources.prepareFilesForStaging(filesToZip, (String)temporaryLocation).get(0));
            filesToStage.add(individualStagingFile.getAbsolutePath());
        }
        if (includeNonExisting) {
            filesToStage.add("/path/to/not/existing/dir");
        }
        FlinkPipelineOptions options = this.setPipelineOptions(flinkMaster, tempLocation, filesToStage);
        FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
        flinkEnv.translate(pipeline);
        return options;
    }

    private @UnknownKeyFor @NonNull @Initialized FlinkPipelineOptions setPipelineOptions(@UnknownKeyFor @NonNull @Initialized String flinkMaster, @UnknownKeyFor @NonNull @Initialized String tempLocation, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> filesToStage) {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(TestFlinkRunner.class);
        options.setFlinkMaster(flinkMaster);
        options.setTempLocation(tempLocation);
        options.setFilesToStage(filesToStage);
        return options;
    }

    private static @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized URL> convertFilesToURLs(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> filePaths) {
        return filePaths.stream().map(file -> {
            try {
                return new File((String)file).getAbsoluteFile().toURI().toURL();
            }
            catch (MalformedURLException e) {
                throw new RuntimeException("Failed to convert to URL", e);
            }
        }).collect(Collectors.toList());
    }

    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized URL> getJars(@UnknownKeyFor @NonNull @Initialized Object env) throws @UnknownKeyFor @NonNull @Initialized Exception {
        try {
            return (List)Whitebox.getInternalState((Object)env, (String)"jarFiles");
        }
        catch (FieldNotFoundException t) {
            Configuration config = (Configuration)Whitebox.getInternalState((Object)env, (String)"configuration");
            Class<?> accesorClass = Class.forName("org.apache.flink.client.cli.ExecutionConfigAccessor");
            Method fromConfigurationMethod = accesorClass.getDeclaredMethod("fromConfiguration", Configuration.class);
            Object accesor = fromConfigurationMethod.invoke(null, config);
            Method getJarsMethod = accesorClass.getDeclaredMethod("getJars", new Class[0]);
            return (List)getJarsMethod.invoke(accesor, new Object[0]);
        }
    }
}

