/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.junit.Rule;
import org.junit.rules.Timeout;

public class SelfJoinDeadlockITCase
extends JavaProgramTestBaseJUnit4 {
    protected String resultPath;
    @Rule
    public Timeout globalTimeout = new Timeout(120000);

    protected void preSubmit() throws Exception {
        this.resultPath = this.getTempDirPath("result");
    }

    protected void testProgram() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource ds = env.createInput((InputFormat)new LargeJoinDataGeneratorInputFormat(1000000));
        ds.join((DataStream)ds).where((KeySelector & Serializable)x -> (Integer)x.f0).equalTo((KeySelector & Serializable)x -> (Integer)x.f1).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply((FlatJoinFunction)new Joiner()).sinkTo((Sink)FileSink.forRowFormat((Path)new Path(this.resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute("Local Selfjoin Test Job");
    }

    private static class LargeJoinDataGeneratorInputFormat
    extends GenericInputFormat<Tuple3<Integer, Integer, String>>
    implements NonParallelInput {
        private static final long serialVersionUID = 1L;
        private final Random rand = new Random(42L);
        private final int toProduce;
        private int produced;

        public LargeJoinDataGeneratorInputFormat(int toProduce) {
            this.toProduce = toProduce;
        }

        public boolean reachedEnd() throws IOException {
            return this.produced >= this.toProduce;
        }

        public Tuple3<Integer, Integer, String> nextRecord(Tuple3<Integer, Integer, String> reuse) throws IOException {
            ++this.produced;
            return new Tuple3((Object)this.rand.nextInt(this.toProduce), (Object)this.rand.nextInt(this.toProduce), (Object)"aaa");
        }
    }

    private static class Joiner
    implements FlatJoinFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Integer, String>, Tuple5<Integer, Integer, Integer, String, String>> {
        private Joiner() {
        }

        public void join(Tuple3<Integer, Integer, String> in1, Tuple3<Integer, Integer, String> in2, Collector<Tuple5<Integer, Integer, Integer, String, String>> out) throws Exception {
            out.collect((Object)new Tuple5((Object)((Integer)in1.f0), (Object)((Integer)in1.f1), (Object)((Integer)in2.f1), (Object)((String)in1.f2), (Object)((String)in2.f2)));
        }
    }
}

