/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api;

import java.io.Serializable;
import java.util.HashSet;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.junit.Assert;
import org.junit.Test;

public class WindowCrossJoinTest
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final long MEMORYSIZE = 32L;
    private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new HashSet();
    private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new HashSet();
    private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new HashSet();
    private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new HashSet();

    @Test
    public void test() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        env.setBufferTimeout(1L);
        HashSet<Tuple2> in1 = new HashSet<Tuple2>();
        HashSet<Tuple1> in2 = new HashSet<Tuple1>();
        in1.add(new Tuple2((Object)10, (Object)"a"));
        in1.add(new Tuple2((Object)20, (Object)"b"));
        in1.add(new Tuple2((Object)20, (Object)"x"));
        in1.add(new Tuple2((Object)0, (Object)"y"));
        in2.add(new Tuple1((Object)0));
        in2.add(new Tuple1((Object)5));
        in2.add(new Tuple1((Object)20));
        joinExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"b"), (Object)20));
        joinExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"x"), (Object)20));
        joinExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)0, (Object)"y"), (Object)0));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)10, (Object)"a"), (Object)0));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)10, (Object)"a"), (Object)5));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)10, (Object)"a"), (Object)20));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"b"), (Object)0));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"b"), (Object)5));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"b"), (Object)20));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"x"), (Object)0));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"x"), (Object)5));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)20, (Object)"x"), (Object)20));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)0, (Object)"y"), (Object)0));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)0, (Object)"y"), (Object)5));
        crossExpectedResults.add((Tuple2<Tuple2<Integer, String>, Integer>)new Tuple2((Object)new Tuple2((Object)0, (Object)"y"), (Object)20));
        DataStreamSource inStream1 = env.fromCollection(in1);
        DataStreamSource inStream2 = env.fromCollection(in2);
        ((StreamJoinOperator.JoinWindow)inStream1.join((DataStream)inStream2).onWindow(1000L, new MyTimestamp(), new MyTimestamp(), 100L)).where(new int[]{0}).equalTo(new int[]{0}).addSink((SinkFunction)new JoinResultSink());
        ((StreamCrossOperator.CrossWindow)inStream1.cross((DataStream)inStream2).onWindow(1000L, new MyTimestamp(), new MyTimestamp(), 100L)).with((CrossFunction)new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
                return new Tuple2(val1, val2);
            }
        }).addSink((SinkFunction)new CrossResultSink());
        env.executeTest(32L);
        Assert.assertEquals(joinExpectedResults, joinResults);
        Assert.assertEquals(crossExpectedResults, crossResults);
    }

    private static class CrossResultSink
    implements SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
        private static final long serialVersionUID = 1L;

        private CrossResultSink() {
        }

        public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
            crossResults.add(new Tuple2(value.f0, ((Tuple1)value.f1).f0));
        }
    }

    private static class JoinResultSink
    implements SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
        private static final long serialVersionUID = 1L;

        private JoinResultSink() {
        }

        public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
            joinResults.add(new Tuple2(value.f0, ((Tuple1)value.f1).f0));
        }
    }

    private static class MyTimestamp<T>
    implements Timestamp<T> {
        private static final long serialVersionUID = 1L;

        private MyTimestamp() {
        }

        public long getTimestamp(T value) {
            return 101L;
        }
    }
}

