package org.apache.flink.streaming.api;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/WindowCrossJoinTest.class */
public class WindowCrossJoinTest implements Serializable {
    private static final long serialVersionUID = 1;
    private static final long MEMORYSIZE = 32;
    private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<>();
    private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<>();
    private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<>();
    private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<>();

    /* loaded from: input_file:org/apache/flink/streaming/api/WindowCrossJoinTest$CrossResultSink.class */
    private static class CrossResultSink implements SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
        private static final long serialVersionUID = 1;

        private CrossResultSink() {
        }

        public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> tuple2) {
            WindowCrossJoinTest.crossResults.add(new Tuple2(tuple2.f0, ((Tuple1) tuple2.f1).f0));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/WindowCrossJoinTest$JoinResultSink.class */
    private static class JoinResultSink implements SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
        private static final long serialVersionUID = 1;

        private JoinResultSink() {
        }

        public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> tuple2) {
            WindowCrossJoinTest.joinResults.add(new Tuple2(tuple2.f0, ((Tuple1) tuple2.f1).f0));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/WindowCrossJoinTest$MyTimestamp.class */
    private static class MyTimestamp<T> implements Timestamp<T> {
        private static final long serialVersionUID = 1;

        private MyTimestamp() {
        }

        public long getTimestamp(T t) {
            return 101L;
        }
    }

    @Test
    public void test() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.setBufferTimeout(serialVersionUID);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(new Tuple2(10, "a"));
        arrayList.add(new Tuple2(20, "b"));
        arrayList.add(new Tuple2(20, "x"));
        arrayList.add(new Tuple2(0, "y"));
        arrayList2.add(new Tuple1(0));
        arrayList2.add(new Tuple1(5));
        arrayList2.add(new Tuple1(20));
        joinExpectedResults.add(new Tuple2<>(new Tuple2(20, "b"), 20));
        joinExpectedResults.add(new Tuple2<>(new Tuple2(20, "x"), 20));
        joinExpectedResults.add(new Tuple2<>(new Tuple2(0, "y"), 0));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(10, "a"), 0));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(10, "a"), 5));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(10, "a"), 20));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "b"), 0));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "b"), 5));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "b"), 20));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "x"), 0));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "x"), 5));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(20, "x"), 20));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(0, "y"), 0));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(0, "y"), 5));
        crossExpectedResults.add(new Tuple2<>(new Tuple2(0, "y"), 20));
        DataStreamSource fromCollection = createLocalEnvironment.fromCollection(arrayList);
        DataStreamSource fromCollection2 = createLocalEnvironment.fromCollection(arrayList2);
        fromCollection.join(fromCollection2).onWindow(1000L, new MyTimestamp(), new MyTimestamp(), 100L).where(new int[]{0}).equalTo(new int[]{0}).addSink(new JoinResultSink());
        fromCollection.cross(fromCollection2).onWindow(1000L, new MyTimestamp(), new MyTimestamp(), 100L).with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() { // from class: org.apache.flink.streaming.api.WindowCrossJoinTest.1
            private static final long serialVersionUID = 1;

            public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(Tuple2<Integer, String> tuple2, Tuple1<Integer> tuple1) throws Exception {
                return new Tuple2<>(tuple2, tuple1);
            }
        }).addSink(new CrossResultSink());
        createLocalEnvironment.executeTest(MEMORYSIZE);
        Assert.assertEquals(joinExpectedResults, joinResults);
        Assert.assertEquals(crossExpectedResults, crossResults);
    }
}
