/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.cancelling;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.operators.GenericDataSink;
import org.apache.flink.api.java.record.operators.GenericDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.cancelling.CancellingTestBase;
import org.apache.flink.test.recordJobs.util.DiscardingOutputFormat;
import org.apache.flink.test.recordJobs.util.InfiniteIntegerInputFormat;
import org.apache.flink.test.recordJobs.util.InfiniteIntegerInputFormatWithDelay;
import org.apache.flink.test.recordJobs.util.UniformIntInput;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;

public class MatchJoinCancelingITCase
extends CancellingTestBase {
    private static final int parallelism = 4;

    public MatchJoinCancelingITCase() {
        this.setTaskManagerNumSlots(4);
    }

    public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormatWithDelay(), "Source 1");
        GenericDataSource source2 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormatWithDelay(), "Source 2");
        JoinOperator matcher = JoinOperator.builder(SimpleMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 3000, 10000);
    }

    public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 1");
        GenericDataSource source2 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 2");
        JoinOperator matcher = JoinOperator.builder(SimpleMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 5000, 10000);
    }

    public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 1");
        GenericDataSource source2 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 2");
        JoinOperator matcher = JoinOperator.builder(StuckInOpenMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Stuc-In-Open Match").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 5000);
        this.runAndCancelJob(p, 10000, 10000);
    }

    public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 1");
        source1.setParameter("testfomat.numkeys", 50000);
        source1.setParameter("testfomat.numvalues", 100);
        GenericDataSource source2 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 2");
        source2.setParameter("testfomat.numkeys", 50000);
        source2.setParameter("testfomat.numvalues", 100);
        JoinOperator matcher = JoinOperator.builder(SimpleMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Long Cancelling Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 30000, 30000);
    }

    public void testCancelSortMatchWhileJoining() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 1");
        source1.setParameter("testfomat.numkeys", 500);
        source1.setParameter("testfomat.numvalues", 3);
        GenericDataSource source2 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 2");
        source2.setParameter("testfomat.numkeys", 500);
        source2.setParameter("testfomat.numvalues", 3);
        JoinOperator matcher = JoinOperator.builder(DelayingMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Long Cancelling Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 10000, 20000);
    }

    public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 1");
        source1.setParameter("testfomat.numkeys", 500);
        source1.setParameter("testfomat.numvalues", 3);
        GenericDataSource source2 = new GenericDataSource((InputFormat)new UniformIntInput(), "Source 2");
        source2.setParameter("testfomat.numkeys", 500);
        source2.setParameter("testfomat.numvalues", 3);
        JoinOperator matcher = JoinOperator.builder(LongCancelTimeMatcher.class, IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Long Cancelling Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 10000, 10000);
    }

    public void testCancelSortMatchWithHighparallelism() throws Exception {
        GenericDataSource source1 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 1");
        GenericDataSource source2 = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source 2");
        JoinOperator matcher = JoinOperator.builder((JoinFunction)new SimpleMatcher(), IntValue.class, (int)0, (int)0).input1((Operator)source1).input2((Operator)source2).name("Sort Join").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)matcher, "Sink");
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(64);
        this.runAndCancelJob(p, 3000, 20000);
    }

    public static final class StuckInOpenMatcher
    extends JoinFunction {
        private static final long serialVersionUID = 1L;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            StuckInOpenMatcher stuckInOpenMatcher = this;
            synchronized (stuckInOpenMatcher) {
                ((Object)((Object)this)).wait();
            }
        }

        public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
            value1.setField(1, value2.getField(0, IntValue.class));
            out.collect((Object)value1);
        }
    }

    public static final class LongCancelTimeMatcher
    extends JoinFunction {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
            value1.setField(1, value2.getField(0, IntValue.class));
            long start = System.currentTimeMillis();
            long remaining = 5000L;
            do {
                try {
                    Thread.sleep(remaining);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((remaining = 5000L - System.currentTimeMillis() + start) > 0L);
            out.collect((Object)value1);
        }
    }

    public static final class DelayingMatcher
    extends JoinFunction {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
            Thread.sleep(10000L);
            value1.setField(1, value2.getField(0, IntValue.class));
            out.collect((Object)value1);
        }
    }

    public static final class SimpleMatcher
    extends JoinFunction {
        private static final long serialVersionUID = 1L;

        public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
            value1.setField(1, value2.getField(0, IntValue.class));
            out.collect((Object)value1);
        }
    }
}

