package org.apache.flink.streaming.api.invokable.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.class */
public class WindowGroupReduceInvokableTest {
    private static final String EOW = "|";
    private static List<WindowGroupReduceInvokable<Integer, String>> invokables = new ArrayList();
    private static List<List<String>> expectedResults = new ArrayList();

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest$MySlidingWindowReduce.class */
    public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        public void reduce(Iterable<Integer> iterable, Collector<String> collector) throws Exception {
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next().toString());
            }
            collector.collect(WindowGroupReduceInvokableTest.EOW);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest$MyTimestamp.class */
    public static final class MyTimestamp implements TimeStamp<Integer> {
        private static final long serialVersionUID = 1;
        private Iterator<Long> timestamps;
        private long start;

        public MyTimestamp(List<Long> list) {
            this.timestamps = list.iterator();
            this.start = list.get(0).longValue();
        }

        public long getTimestamp(Integer num) {
            return this.timestamps.next().longValue();
        }

        public long getStartTime() {
            return this.start;
        }
    }

    @Before
    public void before() {
        List asList = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L, 110L);
        expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7", EOW, "7", "8", "9", EOW, "9", "10", EOW));
        invokables.add(new WindowGroupReduceInvokable<>(new MySlidingWindowReduce(), 3L, 2L, new MyTimestamp(asList)));
        List asList2 = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
        expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", "6", EOW, "3", "4", "5", "6", EOW, "7", EOW, "7", "8", "9", EOW, "8", "9", "10", EOW));
        invokables.add(new WindowGroupReduceInvokable<>(new MySlidingWindowReduce(), 10L, 5L, new MyTimestamp(asList2)));
        List asList3 = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
        expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", "6", EOW, "3", "4", "5", "6", EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8", "9", "10", EOW));
        invokables.add(new WindowGroupReduceInvokable<>(new MySlidingWindowReduce(), 10L, 4L, new MyTimestamp(asList3)));
    }

    @Test
    public void slidingBatchReduceTest() {
        ArrayList arrayList = new ArrayList();
        Iterator<WindowGroupReduceInvokable<Integer, String>> it = invokables.iterator();
        while (it.hasNext()) {
            arrayList.add(MockInvokable.createAndExecute(it.next(), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
        }
        Iterator it2 = arrayList.iterator();
        Iterator<List<String>> it3 = expectedResults.iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(it3.next(), it2.next());
        }
    }
}
