package org.apache.flink.test.cancelling;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.junit.Ignore;
import org.junit.Test;

@Ignore("Takes too long.")
/* loaded from: input_file:org/apache/flink/test/cancelling/JoinCancelingITCase.class */
public class JoinCancelingITCase extends CancelingTestBase {

    /* loaded from: input_file:org/apache/flink/test/cancelling/JoinCancelingITCase$DelayingMatcher.class */
    private static final class DelayingMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        private DelayingMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> tuple2, Tuple2<IN, IN> tuple22) throws Exception {
            Thread.sleep(10000L);
            return new Tuple2<>(tuple2.f0, tuple22.f0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/JoinCancelingITCase$LongCancelTimeMatcher.class */
    private static final class LongCancelTimeMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        private LongCancelTimeMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> tuple2, Tuple2<IN, IN> tuple22) throws Exception {
            long currentTimeMillis;
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = 5000;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = (5000 - System.currentTimeMillis()) + currentTimeMillis2;
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
            return new Tuple2<>(tuple2.f0, tuple22.f0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/JoinCancelingITCase$SimpleMatcher.class */
    private static final class SimpleMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1;

        private SimpleMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> tuple2, Tuple2<IN, IN> tuple22) throws Exception {
            return new Tuple2<>(tuple2.f0, tuple22.f0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/JoinCancelingITCase$StuckInOpenMatcher.class */
    private static final class StuckInOpenMatcher<IN> extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1;

        private StuckInOpenMatcher() {
        }

        public void open(OpenContext openContext) throws Exception {
            synchronized (this) {
                wait();
            }
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> tuple2, Tuple2<IN, IN> tuple22) throws Exception {
            return new Tuple2<>(tuple2.f0, tuple22.f0);
        }
    }

    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joinFunction, boolean z) throws Exception {
        executeTask(joinFunction, z, 4);
    }

    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joinFunction, boolean z, int i) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new InfiniteIntegerTupleInputFormat(z)).join(executionEnvironment.createInput(new InfiniteIntegerTupleInputFormat(z)), JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(joinFunction).output(new DiscardingOutputFormat());
        executionEnvironment.setParallelism(i);
        runAndCancelJob(executionEnvironment.createProgramPlan(), 5000, 10000);
    }

    @Test
    public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
        executeTask(new SimpleMatcher(), true);
    }

    @Test
    public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
        executeTask(new SimpleMatcher(), false);
    }

    @Test
    public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
        executeTask(new StuckInOpenMatcher(), false);
    }

    private void executeTaskWithGenerator(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joinFunction, int i, int i2, int i3, int i4) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new UniformIntTupleGeneratorInputFormat(i, i2)).join(executionEnvironment.createInput(new UniformIntTupleGeneratorInputFormat(i, i2)), JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(joinFunction).output(new DiscardingOutputFormat());
        executionEnvironment.setParallelism(4);
        runAndCancelJob(executionEnvironment.createProgramPlan(), i3, i4);
    }

    @Test
    public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        HeavyCompareGeneratorInputFormat heavyCompareGeneratorInputFormat = new HeavyCompareGeneratorInputFormat(100);
        executionEnvironment.createInput(heavyCompareGeneratorInputFormat).join(executionEnvironment.createInput(heavyCompareGeneratorInputFormat), JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() { // from class: org.apache.flink.test.cancelling.JoinCancelingITCase.1
            public Tuple2<HeavyCompare, Integer> join(Tuple2<HeavyCompare, Integer> tuple2, Tuple2<HeavyCompare, Integer> tuple22) throws Exception {
                throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
            }
        }).output(new DiscardingOutputFormat());
        runAndCancelJob(executionEnvironment.createProgramPlan(), 30000, 60000);
    }

    @Test
    public void testCancelSortMatchWhileJoining() throws Exception {
        executeTaskWithGenerator(new DelayingMatcher(), 500, 3, 10000, 20000);
    }

    @Test
    public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
        executeTaskWithGenerator(new LongCancelTimeMatcher(), 500, 3, 10000, 10000);
    }

    @Test
    public void testCancelSortMatchWithHighparallelism() throws Exception {
        executeTask(new SimpleMatcher(), false, 64);
    }
}
