/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.execution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
import org.apache.flink.core.execution.JobExecutionStatusEvent;
import org.apache.flink.core.execution.JobStatusChangedEvent;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
import org.apache.flink.streaming.runtime.execution.JobCreatedEvent;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.MethodSorters;

@FixMethodOrder(value=MethodSorters.NAME_ASCENDING)
public class JobStatusChangedListenerITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final String SOURCE_DATASET_NAME = "LineageSource";
    private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource";
    private static final String SINK_DATASET_NAME = "LineageSink";
    private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(JobStatusChangedListenerITCase.createConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private static List<JobStatusChangedEvent> statusChangedEvents = new ArrayList<JobStatusChangedEvent>();

    @Before
    public void setup() {
        statusChangedEvents.clear();
    }

    @Test
    public void testJobStatusChangedForSucceededApplication() throws Exception {
        Configuration configuration = JobStatusChangedListenerITCase.createConfiguration();
        try (StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);){
            List<String> sourceValues = Arrays.asList("a", "b", "c");
            ArrayList<String> resultValues = new ArrayList<String>();
            try (CloseableIterator iterator = env.fromCollection(sourceValues).executeAndCollect();){
                while (iterator.hasNext()) {
                    resultValues.add((String)iterator.next());
                }
            }
            Assertions.assertThat(resultValues).containsExactlyInAnyOrder((Object[])sourceValues.toArray(new String[0]));
        }
        this.verifyEventMetaData();
        statusChangedEvents.forEach(event -> {
            if (event instanceof DefaultJobExecutionStatusEvent) {
                JobExecutionStatusEvent status = (JobExecutionStatusEvent)event;
                Assertions.assertThat((status.oldStatus() == JobStatus.CREATED && status.newStatus() == JobStatus.RUNNING || status.oldStatus() == JobStatus.RUNNING && status.newStatus() == JobStatus.FINISHED ? 1 : 0) != 0).isTrue();
            } else {
                DefaultJobCreatedEvent createdEvent = (DefaultJobCreatedEvent)event;
                Assertions.assertThat((Comparable)createdEvent.executionMode()).isEqualTo((Object)RuntimeExecutionMode.STREAMING);
            }
        });
    }

    @Test
    public void testJobStatusChangedForFailedApplication() throws Exception {
        Configuration configuration = JobStatusChangedListenerITCase.createConfiguration();
        try (StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);){
            env.setParallelism(4);
            RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)env);
            env.addSource((SourceFunction)new FastFailureSourceFunction()).addSink((SinkFunction)new SleepingSink());
            StreamGraph streamGraph = env.getStreamGraph();
            JobGraph jobGraph = streamGraph.getJobGraph();
            ClusterClient client = MINI_CLUSTER.getClusterClient();
            JobID jobID = (JobID)client.submitJob(jobGraph).get();
            while (!((JobStatus)client.getJobStatus(jobID).get()).equals((Object)JobStatus.FAILED)) {
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.verifyEventMetaData();
        statusChangedEvents.forEach(event -> {
            JobExecutionStatusEvent status = (JobExecutionStatusEvent)event;
            Assertions.assertThat((status.oldStatus() == JobStatus.CREATED && status.newStatus() == JobStatus.RUNNING || status.oldStatus() == JobStatus.RUNNING && status.newStatus() == JobStatus.FAILING || status.oldStatus() == JobStatus.FAILING && status.newStatus() == JobStatus.FAILED ? 1 : 0) != 0).isTrue();
        });
    }

    @Test
    public void testJobStatusChangedForCancelledApplication() throws Exception {
        Configuration configuration = JobStatusChangedListenerITCase.createConfiguration();
        try (StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);){
            DataStreamSource source = env.addSource((SourceFunction)new InfiniteLongSourceFunction());
            source.addSink((SinkFunction)new SleepingSink());
            StreamGraph streamGraph = env.getStreamGraph();
            JobGraph jobGraph = streamGraph.getJobGraph();
            this.verifyLineageGraph(streamGraph.getLineageGraph());
            ClusterClient client = MINI_CLUSTER.getClusterClient();
            JobID jobID = (JobID)client.submitJob(jobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)MINI_CLUSTER.getMiniCluster(), (JobID)jobID, (boolean)false);
            Thread.sleep(100L);
            client.cancel(jobID).get();
            while (!((JobStatus)client.getJobStatus(jobID).get()).equals((Object)JobStatus.CANCELED)) {
            }
        }
        this.verifyEventMetaData();
        statusChangedEvents.forEach(event -> {
            JobExecutionStatusEvent status = (JobExecutionStatusEvent)event;
            Assertions.assertThat((status.oldStatus() == JobStatus.CREATED && status.newStatus() == JobStatus.RUNNING || status.oldStatus() == JobStatus.RUNNING && status.newStatus() == JobStatus.CANCELLING || status.oldStatus() == JobStatus.CANCELLING && status.newStatus() == JobStatus.CANCELED ? 1 : 0) != 0).isTrue();
            if (event instanceof JobCreatedEvent) {
                LineageGraph lineageGraph = ((JobCreatedEvent)event).lineageGraph();
                Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
                Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
            }
        });
    }

    void verifyLineageGraph(LineageGraph lineageGraph) {
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat((Comparable)((SourceLineageVertex)lineageGraph.sources().get(0)).boundedness()).isEqualTo((Object)Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat((int)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(SOURCE_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat((int)((LineageVertex)lineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(SINK_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(SINK_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.relations().size()).isEqualTo(1);
    }

    void verifyEventMetaData() {
        Assertions.assertThat((int)statusChangedEvents.size()).isEqualTo(3);
        Assertions.assertThat((Comparable)statusChangedEvents.get(0).jobId()).isEqualTo((Object)statusChangedEvents.get(1).jobId());
        Assertions.assertThat((String)statusChangedEvents.get(0).jobName()).isEqualTo(statusChangedEvents.get(1).jobName());
        Assertions.assertThat((Comparable)statusChangedEvents.get(1).jobId()).isEqualTo((Object)statusChangedEvents.get(2).jobId());
        Assertions.assertThat((String)statusChangedEvents.get(1).jobName()).isEqualTo(statusChangedEvents.get(2).jobName());
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS, Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
        return configuration;
    }

    private static class SleepingSink
    implements SinkFunction<Long>,
    LineageVertexProvider {
        private SleepingSink() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            Thread.sleep(1000L);
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(JobStatusChangedListenerITCase.SINK_DATASET_NAME, JobStatusChangedListenerITCase.SINK_DATASET_NAMESPACE, new HashMap());
            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
            lineageVertex.addLineageDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }

    private static class InfiniteLongSourceFunction
    implements SourceFunction<Long>,
    LineageVertexProvider {
        private volatile boolean running = true;

        private InfiniteLongSourceFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long next = 0L;
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)next++);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(JobStatusChangedListenerITCase.SOURCE_DATASET_NAME, JobStatusChangedListenerITCase.SOURCE_DATASET_NAMESPACE, new HashMap());
            DefaultSourceLineageVertex lineageVertex = new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
            lineageVertex.addDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }

    private static class FastFailureSourceFunction
    implements SourceFunction<Long> {
        private FastFailureSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            throw new RuntimeException("Failed to execute.");
        }

        public void cancel() {
        }
    }

    private static class TestingJobStatusChangedListener
    implements JobStatusChangedListener {
        private TestingJobStatusChangedListener() {
        }

        public void onEvent(JobStatusChangedEvent event) {
            statusChangedEvents.add(event);
        }
    }

    public static class TestingJobStatusChangedListenerFactory
    implements JobStatusChangedListenerFactory {
        public JobStatusChangedListener createListener(JobStatusChangedListenerFactory.Context context) {
            return new TestingJobStatusChangedListener();
        }
    }
}

