package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.class */
public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
    private static List<String> testResults;

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/CoGroupJoinITCase$Tuple2KeyExtractor.class */
    private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String, Integer>, String> {
        private Tuple2KeyExtractor() {
        }

        public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
            return (String) tuple2.f0;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/CoGroupJoinITCase$Tuple2TimestampExtractor.class */
    private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
        private Tuple2TimestampExtractor() {
        }

        public long extractTimestamp(Tuple2<String, Integer> tuple2, long j) {
            return ((Integer) tuple2.f1).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> tuple2, long j) {
            return new Watermark(j - 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/CoGroupJoinITCase$Tuple3KeyExtractor.class */
    private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
        private Tuple3KeyExtractor() {
        }

        public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
            return (String) tuple3.f0;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/CoGroupJoinITCase$Tuple3TimestampExtractor.class */
    private static class Tuple3TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Integer>> {
        private Tuple3TimestampExtractor() {
        }

        public long extractTimestamp(Tuple3<String, String, Integer> tuple3, long j) {
            return ((Integer) tuple3.f2).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple3<String, String, Integer> tuple3, long j) {
            return new Watermark(((Integer) tuple3.f2).intValue() - 1);
        }
    }

    @Test
    public void testCoGroup() throws Exception {
        testResults = new ArrayList();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.1
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple2.of("a", 0));
                sourceContext.collect(Tuple2.of("a", 1));
                sourceContext.collect(Tuple2.of("a", 2));
                sourceContext.collect(Tuple2.of("b", 3));
                sourceContext.collect(Tuple2.of("b", 4));
                sourceContext.collect(Tuple2.of("b", 5));
                sourceContext.collect(Tuple2.of("a", 6));
                sourceContext.collect(Tuple2.of("a", 7));
                sourceContext.collect(Tuple2.of("a", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()).coGroup(executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.2
            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple2.of("a", 0));
                sourceContext.collect(Tuple2.of("a", 1));
                sourceContext.collect(Tuple2.of("b", 3));
                sourceContext.collect(Tuple2.of("c", 6));
                sourceContext.collect(Tuple2.of("c", 7));
                sourceContext.collect(Tuple2.of("c", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor())).where(new Tuple2KeyExtractor()).equalTo(new Tuple2KeyExtractor()).window(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.4
            public void coGroup(Iterable<Tuple2<String, Integer>> iterable, Iterable<Tuple2<String, Integer>> iterable2, Collector<String> collector) throws Exception {
                StringBuilder sb = new StringBuilder();
                sb.append("F:");
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    sb.append(it.next().toString());
                }
                sb.append(" S:");
                Iterator<Tuple2<String, Integer>> it2 = iterable2.iterator();
                while (it2.hasNext()) {
                    sb.append(it2.next().toString());
                }
                collector.collect(sb.toString());
            }
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.3
            public void invoke(String str) throws Exception {
                CoGroupJoinITCase.testResults.add(str);
            }
        });
        executionEnvironment.execute("CoGroup Test");
        List asList = Arrays.asList("F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", "F:(b,3)(b,4)(b,5) S:(b,3)", "F:(a,6)(a,7)(a,8) S:", "F: S:(c,6)(c,7)(c,8)");
        Collections.sort(asList);
        Collections.sort(testResults);
        Assert.assertEquals(asList, testResults);
    }

    @Test
    public void testJoin() throws Exception {
        testResults = new ArrayList();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.5
            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple3.of("a", "x", 0));
                sourceContext.collect(Tuple3.of("a", "y", 1));
                sourceContext.collect(Tuple3.of("a", "z", 2));
                sourceContext.collect(Tuple3.of("b", "u", 3));
                sourceContext.collect(Tuple3.of("b", "w", 5));
                sourceContext.collect(Tuple3.of("a", "i", 6));
                sourceContext.collect(Tuple3.of("a", "j", 7));
                sourceContext.collect(Tuple3.of("a", "k", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()).join(executionEnvironment.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.6
            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple3.of("a", "u", 0));
                sourceContext.collect(Tuple3.of("a", "w", 1));
                sourceContext.collect(Tuple3.of("b", "i", 3));
                sourceContext.collect(Tuple3.of("b", "k", 5));
                sourceContext.collect(Tuple3.of("a", "x", 6));
                sourceContext.collect(Tuple3.of("a", "z", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor())).where(new Tuple3KeyExtractor()).equalTo(new Tuple3KeyExtractor()).window(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.8
            public String join(Tuple3<String, String, Integer> tuple3, Tuple3<String, String, Integer> tuple32) throws Exception {
                return tuple3 + ":" + tuple32;
            }
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.7
            public void invoke(String str) throws Exception {
                CoGroupJoinITCase.testResults.add(str);
            }
        });
        executionEnvironment.execute("Join Test");
        List asList = Arrays.asList("(a,x,0):(a,u,0)", "(a,x,0):(a,w,1)", "(a,y,1):(a,u,0)", "(a,y,1):(a,w,1)", "(a,z,2):(a,u,0)", "(a,z,2):(a,w,1)", "(b,u,3):(b,i,3)", "(b,u,3):(b,k,5)", "(b,w,5):(b,i,3)", "(b,w,5):(b,k,5)", "(a,i,6):(a,x,6)", "(a,i,6):(a,z,8)", "(a,j,7):(a,x,6)", "(a,j,7):(a,z,8)", "(a,k,8):(a,x,6)", "(a,k,8):(a,z,8)");
        Collections.sort(asList);
        Collections.sort(testResults);
        Assert.assertEquals(asList, testResults);
    }

    @Test
    public void testSelfJoin() throws Exception {
        testResults = new ArrayList();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        SingleOutputStreamOperator assignTimestampsAndWatermarks = executionEnvironment.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.9
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple3.of("a", "x", 0));
                sourceContext.collect(Tuple3.of("a", "y", 1));
                sourceContext.collect(Tuple3.of("a", "z", 2));
                sourceContext.collect(Tuple3.of("b", "u", 3));
                sourceContext.collect(Tuple3.of("b", "w", 5));
                sourceContext.collect(Tuple3.of("a", "i", 6));
                sourceContext.collect(Tuple3.of("a", "j", 7));
                sourceContext.collect(Tuple3.of("a", "k", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
        assignTimestampsAndWatermarks.join(assignTimestampsAndWatermarks).where(new Tuple3KeyExtractor()).equalTo(new Tuple3KeyExtractor()).window(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.11
            public String join(Tuple3<String, String, Integer> tuple3, Tuple3<String, String, Integer> tuple32) throws Exception {
                return tuple3 + ":" + tuple32;
            }
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.10
            public void invoke(String str) throws Exception {
                CoGroupJoinITCase.testResults.add(str);
            }
        });
        executionEnvironment.execute("Self-Join Test");
        List asList = Arrays.asList("(a,x,0):(a,x,0)", "(a,x,0):(a,y,1)", "(a,x,0):(a,z,2)", "(a,y,1):(a,x,0)", "(a,y,1):(a,y,1)", "(a,y,1):(a,z,2)", "(a,z,2):(a,x,0)", "(a,z,2):(a,y,1)", "(a,z,2):(a,z,2)", "(b,u,3):(b,u,3)", "(b,u,3):(b,w,5)", "(b,w,5):(b,u,3)", "(b,w,5):(b,w,5)", "(a,i,6):(a,i,6)", "(a,i,6):(a,j,7)", "(a,i,6):(a,k,8)", "(a,j,7):(a,i,6)", "(a,j,7):(a,j,7)", "(a,j,7):(a,k,8)", "(a,k,8):(a,i,6)", "(a,k,8):(a,j,7)", "(a,k,8):(a,k,8)");
        Collections.sort(asList);
        Collections.sort(testResults);
        Assert.assertEquals(asList, testResults);
    }

    @Test
    public void testCoGroupOperatorWithCheckpoint() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("a", 0), Tuple2.of("b", 3)}).coGroup(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("a", 1), Tuple2.of("b", 6)})).where(new Tuple2KeyExtractor()).equalTo(new Tuple2KeyExtractor()).window(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.CoGroupJoinITCase.12
            public void coGroup(Iterable<Tuple2<String, Integer>> iterable, Iterable<Tuple2<String, Integer>> iterable2, Collector<String> collector) throws Exception {
                collector.collect(iterable + ":" + iterable2);
            }
        }).getTransformation().getOperator(), new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
    }
}
