package org.apache.flink.client.program;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/client/program/ClientTest.class */
public class ClientTest extends TestLogger {

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().build());
    private Plan plan;
    private Configuration config;
    private static final String TEST_EXECUTOR_NAME = "test_executor";
    private static final String ACCUMULATOR_NAME = "test_accumulator";
    private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestEager.class */
    public static final class TestEager {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 2}).collect();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestExecutorServiceLoader.class */
    public static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {
        private final ClusterClient<?> clusterClient;
        private final Plan plan;

        TestExecutorServiceLoader(ClusterClient<?> clusterClient, Plan plan) {
            this.clusterClient = (ClusterClient) Preconditions.checkNotNull(clusterClient);
            this.plan = (Plan) Preconditions.checkNotNull(plan);
        }

        public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
            return new PipelineExecutorFactory() { // from class: org.apache.flink.client.program.ClientTest.TestExecutorServiceLoader.1
                public String getName() {
                    return "my-name";
                }

                public boolean isCompatibleWith(@Nonnull Configuration configuration2) {
                    return ClientTest.TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration2.getString(DeploymentOptions.TARGET));
                }

                public PipelineExecutor getExecutor(@Nonnull Configuration configuration2) {
                    return (pipeline, configuration3, classLoader) -> {
                        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(TestExecutorServiceLoader.this.plan, configuration3, configuration3.getInteger(CoreOptions.DEFAULT_PARALLELISM));
                        ExecutionConfigAccessor fromConfiguration = ExecutionConfigAccessor.fromConfiguration(configuration3);
                        jobGraph.addJars(fromConfiguration.getJars());
                        jobGraph.setClasspaths(fromConfiguration.getClasspaths());
                        return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter(() -> {
                            return TestExecutorServiceLoader.this.clusterClient;
                        }, (JobID) TestExecutorServiceLoader.this.clusterClient.submitJob(jobGraph).get(), classLoader));
                    };
                }
            };
        }

        public Stream<String> getExecutorNames() {
            throw new UnsupportedOperationException("not implemented");
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestGetAccumulator.class */
    public static final class TestGetAccumulator {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Integer[]{1, 2}).output(new DiscardingOutputFormat());
            executionEnvironment.execute().getAccumulatorResult(ClientTest.ACCUMULATOR_NAME);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestGetAllAccumulator.class */
    public static final class TestGetAllAccumulator {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Integer[]{1, 2}).output(new DiscardingOutputFormat());
            executionEnvironment.execute().getAllAccumulatorResults();
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestGetJobID.class */
    public static final class TestGetJobID {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Integer[]{1, 2}).output(new DiscardingOutputFormat());
            executionEnvironment.execute().getJobID();
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestGetRuntime.class */
    public static final class TestGetRuntime {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Integer[]{1, 2}).output(new DiscardingOutputFormat());
            executionEnvironment.execute().getNetRuntime();
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestMultiExecute.class */
    public static final class TestMultiExecute {
        public static void main(String[] strArr) throws Exception {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            for (int i = 0; i < 2; i++) {
                executionEnvironment.fromElements(new Integer[]{1, 2}).output(new DiscardingOutputFormat());
                executionEnvironment.executeAsync().getJobExecutionResult();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$TestOptimizerPlan.class */
    public static class TestOptimizerPlan implements ProgramDescription {
        public static void main(String[] strArr) throws Exception {
            if (strArr.length < 2) {
                System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
                return;
            }
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.readCsvFile(strArr[0]).fieldDelimiter("\t").types(Long.class, Long.class).map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.client.program.ClientTest.TestOptimizerPlan.1
                public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) {
                    return new Tuple2<>(tuple2.f0, Long.valueOf(((Long) tuple2.f1).longValue() + 1));
                }
            }).writeAsCsv(strArr[1], "\n", "\t");
            executionEnvironment.execute();
        }

        public String getDescription() {
            return "TestOptimizerPlan <input-file-path> <output-file-path>";
        }
    }

    @Before
    public void setUp() throws Exception {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.generateSequence(1L, 1000L).output(new DiscardingOutputFormat());
        this.plan = createLocalEnvironment.createProgramPlan();
        int availablePort = NetUtils.getAvailablePort();
        this.config = new Configuration();
        this.config.setString(JobManagerOptions.ADDRESS, "localhost");
        this.config.setInteger(JobManagerOptions.PORT, availablePort);
        this.config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
    }

    private Configuration fromPackagedProgram(PackagedProgram packagedProgram, int i, boolean z) {
        Configuration configuration = new Configuration();
        configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, Integer.valueOf(i));
        configuration.set(DeploymentOptions.ATTACHED, Boolean.valueOf(!z));
        ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, packagedProgram.getClasspaths(), (v0) -> {
            return v0.toString();
        });
        ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, packagedProgram.getJobJarAndDependencies(), (v0) -> {
            return v0.toString();
        });
        return configuration;
    }

    @Test
    public void testDetachedMode() throws Exception {
        MiniClusterClient miniClusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
        try {
            PackagedProgram build = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
            ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(build, 1, true), build, false, false);
            Assert.fail(FAIL_MESSAGE);
        } catch (ProgramInvocationException e) {
            Assert.assertEquals("Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. ", e.getCause().getMessage());
        }
        try {
            PackagedProgram build2 = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
            ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(build2, 1, true), build2, false, false);
            Assert.fail(FAIL_MESSAGE);
        } catch (ProgramInvocationException e2) {
            Assert.assertEquals("Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. ", e2.getCause().getMessage());
        }
        try {
            PackagedProgram build3 = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
            ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(build3, 1, true), build3, false, false);
            Assert.fail(FAIL_MESSAGE);
        } catch (ProgramInvocationException e3) {
            Assert.assertEquals("Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. ", e3.getCause().getMessage());
        }
        try {
            PackagedProgram build4 = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
            ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(build4, 1, true), build4, false, false);
            Assert.fail(FAIL_MESSAGE);
        } catch (ProgramInvocationException e4) {
            Assert.assertEquals("Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. ", e4.getCause().getMessage());
        }
    }

    @Test(expected = FlinkRuntimeException.class)
    public void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {
        try {
            launchMultiExecuteJob(true);
        } catch (Exception e) {
            if (e instanceof ProgramInvocationException) {
                throw e.getCause();
            }
        }
        Assert.fail("Test should have failed due to multiple execute() calls.");
    }

    @Test
    public void testMultiExecuteWithoutEnforcingSingleJobExecution() throws ProgramInvocationException {
        launchMultiExecuteJob(false);
    }

    private void launchMultiExecuteJob(boolean z) throws ProgramInvocationException {
        MiniClusterClient miniClusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
        Throwable th = null;
        try {
            try {
                PackagedProgram build = PackagedProgram.newBuilder().setEntryPointClassName(TestMultiExecute.class.getName()).build();
                ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(build, 1, false), build, z, false);
                if (miniClusterClient != null) {
                    if (0 == 0) {
                        miniClusterClient.close();
                        return;
                    }
                    try {
                        miniClusterClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniClusterClient != null) {
                if (th != null) {
                    try {
                        miniClusterClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniClusterClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldSubmitToJobClient() throws Exception {
        MiniClusterClient miniClusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(this.plan, new Configuration(), 1);
        jobGraph.addJars(Collections.emptyList());
        jobGraph.setClasspaths(Collections.emptyList());
        Assert.assertNotNull(miniClusterClient.submitJob(jobGraph).get());
    }

    @Test
    public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
        PackagedProgram packagedProgram = (PackagedProgram) Mockito.mock(PackagedProgram.class);
        Mockito.when(packagedProgram.getUserCodeClassLoader()).thenReturn(packagedProgram.getClass().getClassLoader());
        ((PackagedProgram) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.client.program.ClientTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m6answer(InvocationOnMock invocationOnMock) throws Throwable {
                ExecutionEnvironment.createLocalEnvironment();
                return null;
            }
        }).when(packagedProgram)).invokeInteractiveModeForExecution();
        try {
            MiniClusterClient miniClusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
            ClientUtils.executeProgram(new TestExecutorServiceLoader(miniClusterClient, this.plan), fromPackagedProgram(packagedProgram, 1, true), packagedProgram, false, false);
            Assert.fail("Creating the local execution environment should not be possible");
        } catch (InvalidProgramException e) {
        }
    }

    @Test
    public void testGetExecutionPlan() throws ProgramInvocationException {
        OptimizedPlan compile = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), this.config).compile(PackagedProgramUtils.getPipelineFromProgram(PackagedProgram.newBuilder().setEntryPointClassName(TestOptimizerPlan.class.getName()).setArguments(new String[]{"/dev/random", "/tmp"}).build(), new Configuration(), 1, true));
        Assert.assertNotNull(compile);
        Assert.assertNotNull(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(compile));
        new PlanJSONDumpGenerator().setEncodeForHTML(true);
        Assert.assertEquals(-1L, r0.getOptimizerPlanAsJSON(compile).indexOf(92));
    }
}
