package org.apache.flink.streaming.api.invokable.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.class */
public class BatchGroupReduceTest {
    private static final String END_OF_BATCH = "end of batch";
    private static final int SLIDING_BATCH_SIZE = 3;
    private static final int SLIDE_SIZE = 2;
    private static final int BATCH_SIZE = 5;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest$MyBatchReduce.class */
    public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
        private static final long serialVersionUID = 1;

        public void reduce(Iterable<Double> iterable, Collector<Double> collector) throws Exception {
            Double valueOf = Double.valueOf(0.0d);
            Double valueOf2 = Double.valueOf(0.0d);
            Iterator<Double> it = iterable.iterator();
            while (it.hasNext()) {
                valueOf = Double.valueOf(valueOf.doubleValue() + it.next().doubleValue());
                valueOf2 = Double.valueOf(valueOf2.doubleValue() + 1.0d);
            }
            if (valueOf2.doubleValue() > 0.0d) {
                collector.collect(new Double(valueOf.doubleValue() / valueOf2.doubleValue()));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest$MySlidingBatchReduce.class */
    public static final class MySlidingBatchReduce implements GroupReduceFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        public void reduce(Iterable<Integer> iterable, Collector<String> collector) throws Exception {
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next().toString());
            }
            collector.collect(BatchGroupReduceTest.END_OF_BATCH);
        }
    }

    @Test
    public void slidingBatchReduceTest() {
        Assert.assertEquals(Arrays.asList("1", "2", "3", END_OF_BATCH, "3", "4", "5", END_OF_BATCH, "5", "6", "7", END_OF_BATCH), MockInvokable.createAndExecute(new BatchGroupReduceInvokable(new MySlidingBatchReduce(), 3L, 2L), Arrays.asList(1, Integer.valueOf(SLIDE_SIZE), Integer.valueOf(SLIDING_BATCH_SIZE), 4, Integer.valueOf(BATCH_SIZE), 6, 7)));
    }

    @Test
    public void nonSlidingBatchReduceTest() {
        ArrayList arrayList = new ArrayList();
        for (Double valueOf = Double.valueOf(1.0d); valueOf.doubleValue() <= 100.0d; valueOf = Double.valueOf(valueOf.doubleValue() + 1.0d)) {
            arrayList.add(valueOf);
        }
        List createAndExecute = MockInvokable.createAndExecute(new BatchGroupReduceInvokable(new MyBatchReduce(), 5L, 5L), arrayList);
        for (int i = 0; i < createAndExecute.size(); i++) {
            Assert.assertEquals(3.0d + (i * BATCH_SIZE), ((Double) createAndExecute.get(i)).doubleValue(), 0.0d);
        }
    }
}
