package org.apache.flink.test.cancelling;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.operators.GenericDataSink;
import org.apache.flink.api.java.record.operators.GenericDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.util.DiscardingOutputFormat;
import org.apache.flink.test.recordJobs.util.InfiniteIntegerInputFormat;
import org.apache.flink.test.recordJobs.util.InfiniteIntegerInputFormatWithDelay;
import org.apache.flink.test.recordJobs.util.UniformIntInput;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/test/cancelling/MatchJoinCancelingITCase.class */
public class MatchJoinCancelingITCase extends CancellingTestBase {
    private static final int parallelism = 4;

    /* loaded from: input_file:org/apache/flink/test/cancelling/MatchJoinCancelingITCase$DelayingMatcher.class */
    public static final class DelayingMatcher extends JoinFunction {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            Thread.sleep(10000L);
            record.setField(1, record2.getField(0, IntValue.class));
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MatchJoinCancelingITCase$LongCancelTimeMatcher.class */
    public static final class LongCancelTimeMatcher extends JoinFunction {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            long currentTimeMillis;
            record.setField(1, record2.getField(0, IntValue.class));
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = 5000;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = (5000 - System.currentTimeMillis()) + currentTimeMillis2;
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MatchJoinCancelingITCase$SimpleMatcher.class */
    public static final class SimpleMatcher extends JoinFunction {
        private static final long serialVersionUID = 1;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            record.setField(1, record2.getField(0, IntValue.class));
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MatchJoinCancelingITCase$StuckInOpenMatcher.class */
    public static final class StuckInOpenMatcher extends JoinFunction {
        private static final long serialVersionUID = 1;

        public void open(Configuration configuration) throws Exception {
            synchronized (this) {
                wait();
            }
        }

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            record.setField(1, record2.getField(0, IntValue.class));
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    public MatchJoinCancelingITCase() {
        setTaskManagerNumSlots(parallelism);
    }

    public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new InfiniteIntegerInputFormatWithDelay(), "Source 1");
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(SimpleMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(new GenericDataSource(new InfiniteIntegerInputFormatWithDelay(), "Source 2")).name("Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 3000, 10000);
    }

    public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 1");
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(SimpleMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 2")).name("Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 5000, 10000);
    }

    public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 1");
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(StuckInOpenMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 2")).name("Stuc-In-Open Match").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 5000);
        runAndCancelJob(plan, 10000, 10000);
    }

    public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new UniformIntInput(), "Source 1");
        genericDataSource.setParameter(UniformIntInput.NUM_KEYS_KEY, 50000);
        genericDataSource.setParameter(UniformIntInput.NUM_VALUES_KEY, 100);
        GenericDataSource genericDataSource2 = new GenericDataSource(new UniformIntInput(), "Source 2");
        genericDataSource2.setParameter(UniformIntInput.NUM_KEYS_KEY, 50000);
        genericDataSource2.setParameter(UniformIntInput.NUM_VALUES_KEY, 100);
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(SimpleMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(genericDataSource2).name("Long Cancelling Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 30000, 30000);
    }

    public void testCancelSortMatchWhileJoining() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new UniformIntInput(), "Source 1");
        genericDataSource.setParameter(UniformIntInput.NUM_KEYS_KEY, 500);
        genericDataSource.setParameter(UniformIntInput.NUM_VALUES_KEY, 3);
        GenericDataSource genericDataSource2 = new GenericDataSource(new UniformIntInput(), "Source 2");
        genericDataSource2.setParameter(UniformIntInput.NUM_KEYS_KEY, 500);
        genericDataSource2.setParameter(UniformIntInput.NUM_VALUES_KEY, 3);
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(DelayingMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(genericDataSource2).name("Long Cancelling Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 10000, 20000);
    }

    public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new UniformIntInput(), "Source 1");
        genericDataSource.setParameter(UniformIntInput.NUM_KEYS_KEY, 500);
        genericDataSource.setParameter(UniformIntInput.NUM_VALUES_KEY, 3);
        GenericDataSource genericDataSource2 = new GenericDataSource(new UniformIntInput(), "Source 2");
        genericDataSource2.setParameter(UniformIntInput.NUM_KEYS_KEY, 500);
        genericDataSource2.setParameter(UniformIntInput.NUM_VALUES_KEY, 3);
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(LongCancelTimeMatcher.class, IntValue.class, 0, 0).input1(genericDataSource).input2(genericDataSource2).name("Long Cancelling Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(parallelism);
        runAndCancelJob(plan, 10000, 10000);
    }

    public void testCancelSortMatchWithHighparallelism() throws Exception {
        GenericDataSource genericDataSource = new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 1");
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), JoinOperator.builder(new SimpleMatcher(), IntValue.class, 0, 0).input1(genericDataSource).input2(new GenericDataSource(new InfiniteIntegerInputFormat(), "Source 2")).name("Sort Join").build(), "Sink"));
        plan.setDefaultParallelism(64);
        runAndCancelJob(plan, 3000, 20000);
    }
}
