package org.apache.flink.test.execution;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/execution/JobListenerITCase.class */
public class JobListenerITCase extends TestLogger {

    @ClassRule
    public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());

    private static Configuration getClientConfiguration() {
        Configuration configuration = new Configuration(miniClusterResource.getClientConfiguration());
        configuration.set(DeploymentOptions.TARGET, "remote");
        return configuration;
    }

    @Test
    public void testExecuteCallsJobListenerOnBatchEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        ExecutionEnvironment executionEnvironment = new ExecutionEnvironment(getClientConfiguration());
        executionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.1
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(jobClient.getJobID());
                oneShotLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
                oneShotLatch2.trigger();
            }
        });
        executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).output(new DiscardingOutputFormat());
        JobExecutionResult execute = executionEnvironment.execute();
        oneShotLatch.await(2000L, TimeUnit.MILLISECONDS);
        oneShotLatch2.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(execute.getJobID(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnBatchEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        ExecutionEnvironment executionEnvironment = new ExecutionEnvironment(getClientConfiguration());
        executionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.2
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(jobClient.getJobID());
                oneShotLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).output(new DiscardingOutputFormat());
        JobClient executeAsync = executionEnvironment.executeAsync();
        oneShotLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(executeAsync.getJobID(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        ExecutionEnvironment executionEnvironment = new ExecutionEnvironment(getClientConfiguration());
        executionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.3
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
        MatcherAssert.assertThat(Thread.currentThread(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        ExecutionEnvironment executionEnvironment = new ExecutionEnvironment(getClientConfiguration());
        executionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.4
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).output(new DiscardingOutputFormat());
        executionEnvironment.executeAsync();
        MatcherAssert.assertThat(Thread.currentThread(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnStreamingEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment(getClientConfiguration());
        streamExecutionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.5
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(jobClient.getJobID());
                oneShotLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
                oneShotLatch2.trigger();
            }
        });
        streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).addSink(new DiscardingSink());
        JobExecutionResult execute = streamExecutionEnvironment.execute();
        oneShotLatch.await(2000L, TimeUnit.MILLISECONDS);
        oneShotLatch2.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(execute.getJobID(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnStreamingEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment(getClientConfiguration());
        streamExecutionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.6
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(jobClient.getJobID());
                oneShotLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).addSink(new DiscardingSink());
        JobClient executeAsync = streamExecutionEnvironment.executeAsync();
        oneShotLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(executeAsync.getJobID(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment(getClientConfiguration());
        streamExecutionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.7
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).addSink(new DiscardingSink());
        streamExecutionEnvironment.execute();
        MatcherAssert.assertThat(Thread.currentThread(), CoreMatchers.is(atomicReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment(getClientConfiguration());
        streamExecutionEnvironment.registerJobListener(new JobListener() { // from class: org.apache.flink.test.execution.JobListenerITCase.8
            public void onJobSubmitted(JobClient jobClient, Throwable th) {
                atomicReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable th) {
            }
        });
        streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).addSink(new DiscardingSink());
        streamExecutionEnvironment.executeAsync();
        MatcherAssert.assertThat(Thread.currentThread(), CoreMatchers.is(atomicReference.get()));
    }
}
