package org.apache.flink.test.cancelling;

import java.time.Duration;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/cancelling/JobCancelingITCase.class */
public class JobCancelingITCase extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());

    /* loaded from: input_file:org/apache/flink/test/cancelling/JobCancelingITCase$ExplodingFlatMapFunction.class */
    private static class ExplodingFlatMapFunction implements CoFlatMapFunction<Long, Long, Long> {
        private ExplodingFlatMapFunction() {
        }

        public void flatMap1(Long l, Collector<Long> collector) throws Exception {
            emit(l.longValue(), collector);
        }

        public void flatMap2(Long l, Collector<Long> collector) throws Exception {
            emit(l.longValue(), collector);
        }

        private void emit(long j, Collector<Long> collector) {
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 > j) {
                    return;
                }
                collector.collect(Long.valueOf(j3));
                j2 = j3 + 1;
            }
        }

        public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
            flatMap2((Long) obj, (Collector<Long>) collector);
        }

        public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
            flatMap1((Long) obj, (Collector<Long>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/JobCancelingITCase$InfiniteLongSourceFunction.class */
    private static class InfiniteLongSourceFunction implements SourceFunction<Long> {
        private volatile boolean running;

        private InfiniteLongSourceFunction() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            long j = 0;
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    long j2 = j;
                    j = j2 + 1;
                    sourceContext.collect(Long.valueOf(j2));
                }
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/JobCancelingITCase$SleepingSink.class */
    private static class SleepingSink implements SinkFunction<Long> {
        private SleepingSink() {
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            Thread.sleep(1000L);
        }
    }

    @Test
    public void testCancelingWhileBackPressured() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.getConfig().setTaskCancellationTimeout(Duration.ofDays(1L).toMillis());
        executionEnvironment.getConfig().setTaskCancellationInterval(Duration.ofDays(1L).toMillis());
        DataStreamSource fromSource = executionEnvironment.fromSource(new NumberSequenceSource(1L, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "source-1");
        fromSource.setParallelism(1);
        fromSource.connect(executionEnvironment.addSource(new InfiniteLongSourceFunction())).flatMap(new ExplodingFlatMapFunction()).startNewChain().addSink(new SleepingSink());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        ClusterClient clusterClient = MINI_CLUSTER.getClusterClient();
        JobID jobID = (JobID) clusterClient.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
        Thread.sleep(100L);
        clusterClient.cancel(jobID).get();
        do {
        } while (!((JobStatus) clusterClient.getJobStatus(jobID).get()).isTerminalState());
        TestCase.assertEquals(JobStatus.CANCELED, clusterClient.getJobStatus(jobID).get());
    }
}
