package org.apache.flink.streaming.api.windowing.windowbuffer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.class */
public class TumblingGroupedPreReducerTest {
    TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor.getForObject(new Tuple2(1, 1));
    TypeSerializer<Tuple2<Integer, Integer>> serializer = this.type.createSerializer((ExecutionConfig) null);
    KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys(new int[]{0}, this.type), this.type, (ExecutionConfig) null);
    Reducer reducer = new Reducer();

    /* loaded from: input_file:org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest$Reducer.class */
    private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
        public List<Tuple2<Integer, Integer>> allInputs;

        private Reducer() {
            this.allInputs = new ArrayList();
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
            this.allInputs.add(tuple22);
            tuple2.f0 = Integer.valueOf(((Integer) tuple2.f0).intValue() + ((Integer) tuple22.f0).intValue());
            tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue());
            return tuple2;
        }
    }

    @Test
    public void testEmitWindow() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(1, 1));
        arrayList.add(new Tuple2(0, 0));
        arrayList.add(new Tuple2(1, -1));
        arrayList.add(new Tuple2(1, -2));
        BasicWindowBufferTest.TestCollector testCollector = new BasicWindowBufferTest.TestCollector();
        List collected = testCollector.getCollected();
        TumblingGroupedPreReducer tumblingGroupedPreReducer = new TumblingGroupedPreReducer(this.reducer, this.key, this.serializer);
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(0)));
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(1)));
        tumblingGroupedPreReducer.emitWindow(testCollector);
        Assert.assertEquals(1L, collected.size());
        assertSetEquals(StreamWindow.fromElements(new Tuple2[]{new Tuple2(1, 1), new Tuple2(0, 0)}), (Collection) collected.get(0));
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(0)));
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(1)));
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(2)));
        tumblingGroupedPreReducer.evict(3);
        tumblingGroupedPreReducer.store(this.serializer.copy(arrayList.get(3)));
        tumblingGroupedPreReducer.emitWindow(testCollector);
        Assert.assertEquals(2L, collected.size());
        assertSetEquals(StreamWindow.fromElements(new Tuple2[]{new Tuple2(3, -2), new Tuple2(0, 0)}), (Collection) collected.get(1));
        Assert.assertEquals(2L, this.reducer.allInputs.size());
        Assert.assertEquals(this.reducer.allInputs.get(0), arrayList.get(2));
        Assert.assertEquals(this.reducer.allInputs.get(1), arrayList.get(3));
    }

    @Test
    public void testEmitWindow2() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(1, 1));
        arrayList.add(new Tuple2(0, 0));
        arrayList.add(new Tuple2(1, -1));
        arrayList.add(new Tuple2(1, -2));
        BasicWindowBufferTest.TestCollector testCollector = new BasicWindowBufferTest.TestCollector();
        List collected = testCollector.getCollected();
        WindowBuffer sequentialID = new TumblingGroupedPreReducer(this.reducer, this.key, this.serializer).sequentialID();
        sequentialID.store(this.serializer.copy(arrayList.get(0)));
        sequentialID.store(this.serializer.copy(arrayList.get(1)));
        sequentialID.emitWindow(testCollector);
        assertSetEquals(StreamWindow.fromElements(new Tuple2[]{(Tuple2) arrayList.get(0), (Tuple2) arrayList.get(1)}), (Collection) collected.get(0));
        sequentialID.store(this.serializer.copy(arrayList.get(0)));
        sequentialID.store(this.serializer.copy(arrayList.get(1)));
        sequentialID.store(this.serializer.copy(arrayList.get(2)));
        sequentialID.emitWindow(testCollector);
        assertSetEquals(StreamWindow.fromElements(new Tuple2[]{new Tuple2(2, 0), (Tuple2) arrayList.get(1)}), (Collection) collected.get(1));
    }

    private static <T> void assertSetEquals(Collection<T> collection, Collection<T> collection2) {
        Assert.assertEquals(new HashSet(collection), new HashSet(collection2));
    }
}
