/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.cancelling;

import java.io.Serializable;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.cancelling.CancelingTestBase;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.junit.Ignore;
import org.junit.Test;

@Ignore(value="Takes too long.")
public class JoinCancelingITCase
extends CancelingTestBase {
    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
        this.executeTask(joiner, slow, 4);
    }

    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input1 = env.createInput((InputFormat)new InfiniteIntegerTupleInputFormat(slow));
        DataStreamSource input2 = env.createInput((InputFormat)new InfiniteIntegerTupleInputFormat(slow));
        input1.join((DataStream)input2).where((KeySelector & Serializable)value -> (Integer)value.f0).equalTo((KeySelector & Serializable)value -> (Integer)value.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply(joiner).sinkTo((Sink)new DiscardingSink());
        env.setParallelism(parallelism);
        this.runAndCancelJob(env.getStreamGraph().getJobGraph(), 5000, 10000);
    }

    @Test
    public void testCancelWhileReadingSlowInputs() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), true);
    }

    @Test
    public void testCancelWhileReadingFastInputs() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), false);
    }

    @Test
    public void testCancelPriorToFirstRecordReading() throws Exception {
        this.executeTask((JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>)new StuckInOpenMatcher(), false);
    }

    private void executeTaskWithGenerator(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input1 = env.createInput((InputFormat)new UniformIntTupleGeneratorInputFormat(keys, vals));
        DataStreamSource input2 = env.createInput((InputFormat)new UniformIntTupleGeneratorInputFormat(keys, vals));
        input1.join((DataStream)input2).where((KeySelector & Serializable)value -> (Integer)value.f0).equalTo((KeySelector & Serializable)value -> (Integer)value.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply(joiner).sinkTo((Sink)new DiscardingSink());
        this.runAndCancelJob(env.getStreamGraph().getJobGraph(), msecsTillCanceling, maxTimeTillCanceled);
    }

    @Test
    public void testCancelWhileJoining() throws Exception {
        this.executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10000, 20000);
    }

    @Test
    public void testCancelWithLongCancellingResponse() throws Exception {
        this.executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10000, 10000);
    }

    @Test
    public void testCancelWithHighparallelism() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), false, 64);
    }

    private static final class StuckInOpenMatcher<IN>
    extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;

        private StuckInOpenMatcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(OpenContext openContext) throws Exception {
            StuckInOpenMatcher stuckInOpenMatcher = this;
            synchronized (stuckInOpenMatcher) {
                ((Object)((Object)this)).wait();
            }
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class LongCancelTimeMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        private LongCancelTimeMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            long start = System.currentTimeMillis();
            long remaining = 5000L;
            do {
                try {
                    Thread.sleep(remaining);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((remaining = 5000L - System.currentTimeMillis() + start) > 0L);
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class DelayingMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        private DelayingMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            Thread.sleep(10000L);
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class SimpleMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;

        private SimpleMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            return new Tuple2(first.f0, second.f0);
        }
    }
}

