package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.class */
public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
    private static List<String> testResults;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase$Tuple2TimestampExtractor.class */
    private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
        private Tuple2TimestampExtractor() {
        }

        public long extractTimestamp(Tuple2<String, Integer> tuple2, long j) {
            return ((Integer) tuple2.f1).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> tuple2, long j) {
            return new Watermark(((Integer) tuple2.f1).intValue() - 1);
        }
    }

    @Test
    public void testFoldWindow() throws Exception {
        testResults = new ArrayList();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.1
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple2.of("a", 0));
                sourceContext.collect(Tuple2.of("a", 1));
                sourceContext.collect(Tuple2.of("a", 2));
                sourceContext.collect(Tuple2.of("b", 3));
                sourceContext.collect(Tuple2.of("b", 4));
                sourceContext.collect(Tuple2.of("b", 5));
                sourceContext.collect(Tuple2.of("a", 6));
                sourceContext.collect(Tuple2.of("a", 7));
                sourceContext.collect(Tuple2.of("a", 8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()).keyBy(new int[]{0}).window(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.3
            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                tuple2.f0 += ((String) tuple22.f0);
                tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue());
                return tuple2;
            }
        }).addSink(new SinkFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.2
            public void invoke(Tuple2<String, Integer> tuple2) throws Exception {
                WindowFoldITCase.testResults.add(tuple2.toString());
            }
        });
        executionEnvironment.execute("Fold Window Test");
        List asList = Arrays.asList("(R:aaa,3)", "(R:aaa,21)", "(R:bbb,12)");
        Collections.sort(asList);
        Collections.sort(testResults);
        Assert.assertEquals(asList, testResults);
    }

    @Test
    public void testFoldAllWindow() throws Exception {
        testResults = new ArrayList();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.4
            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
                sourceContext.collect(Tuple2.of("a", 0));
                sourceContext.collect(Tuple2.of("a", 1));
                sourceContext.collect(Tuple2.of("a", 2));
                sourceContext.collect(Tuple2.of("b", 3));
                sourceContext.collect(Tuple2.of("a", 3));
                sourceContext.collect(Tuple2.of("b", 4));
                sourceContext.collect(Tuple2.of("a", 4));
                sourceContext.collect(Tuple2.of("b", 5));
                sourceContext.collect(Tuple2.of("a", 5));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()).windowAll(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.MILLISECONDS))).fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.6
            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                tuple2.f0 += ((String) tuple22.f0);
                tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue());
                return tuple2;
            }
        }).addSink(new SinkFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowFoldITCase.5
            public void invoke(Tuple2<String, Integer> tuple2) throws Exception {
                WindowFoldITCase.testResults.add(tuple2.toString());
            }
        });
        executionEnvironment.execute("Fold All-Window Test");
        List asList = Arrays.asList("(R:aaa,3)", "(R:bababa,24)");
        Collections.sort(asList);
        Collections.sort(testResults);
        Assert.assertEquals(asList, testResults);
    }
}
