/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.cancelling;

import java.time.Duration;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobCancelingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test
    public void testCancelingWhileBackPressured() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.getConfig().enableObjectReuse();
        env.getConfig().setTaskCancellationTimeout(Duration.ofDays(1L).toMillis());
        env.getConfig().setTaskCancellationInterval(Duration.ofDays(1L).toMillis());
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "source-1");
        source1.setParallelism(1);
        DataStreamSource source2 = env.addSource((SourceFunction)new InfiniteLongSourceFunction());
        source1.connect((DataStream)source2).flatMap((CoFlatMapFunction)new ExplodingFlatMapFunction()).startNewChain().addSink((SinkFunction)new SleepingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        ClusterClient client = MINI_CLUSTER.getClusterClient();
        JobID jobID = (JobID)client.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)MINI_CLUSTER.getMiniCluster(), (JobID)jobID, (boolean)false);
        Thread.sleep(100L);
        client.cancel(jobID).get();
        while (!((JobStatus)client.getJobStatus(jobID).get()).isTerminalState()) {
        }
        TestCase.assertEquals((Object)JobStatus.CANCELED, client.getJobStatus(jobID).get());
    }

    private static class SleepingSink
    implements SinkFunction<Long> {
        private SleepingSink() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            Thread.sleep(1000L);
        }
    }

    private static class ExplodingFlatMapFunction
    implements CoFlatMapFunction<Long, Long, Long> {
        private ExplodingFlatMapFunction() {
        }

        public void flatMap1(Long value, Collector<Long> out) throws Exception {
            this.emit(value, out);
        }

        public void flatMap2(Long value, Collector<Long> out) throws Exception {
            this.emit(value, out);
        }

        private void emit(long value, Collector<Long> out) {
            for (long i = 0L; i <= value; ++i) {
                out.collect((Object)i);
            }
        }
    }

    private static class InfiniteLongSourceFunction
    implements SourceFunction<Long> {
        private volatile boolean running = true;

        private InfiniteLongSourceFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long next = 0L;
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)next++);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

