package org.apache.flink.ml.common.window;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.ml.common.datastream.DataStreamUtils;
import org.apache.flink.ml.util.TestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/ml/common/window/WindowsTest.class */
public class WindowsTest extends AbstractTestBase {
    private static final int RECORD_NUM = 100;
    private static List<Long> inputData;
    private static DataStream<Long> inputStream;
    private static DataStream<Long> inputStreamWithProcessingTimeGap;
    private static DataStream<Long> inputStreamWithEventTime;

    /* loaded from: input_file:org/apache/flink/ml/common/window/WindowsTest$CreateAllWindowBatchFunction.class */
    private static class CreateAllWindowBatchFunction<IN, W extends Window> extends ProcessAllWindowFunction<IN, List<IN>, W> {
        private CreateAllWindowBatchFunction() {
        }

        public void process(ProcessAllWindowFunction<IN, List<IN>, W>.Context context, Iterable<IN> iterable, Collector<List<IN>> collector) {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            iterable.forEach(arrayList::add);
            collector.collect(arrayList);
        }
    }

    @BeforeClass
    public static void beforeClass() {
        StreamExecutionEnvironment executionEnvironment = TestUtils.getExecutionEnvironment();
        inputData = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                inputStream = executionEnvironment.fromCollection(inputData);
                inputStreamWithProcessingTimeGap = inputStream.map(new MapFunction<Long, Long>() { // from class: org.apache.flink.ml.common.window.WindowsTest.1
                    private int count = 0;

                    public Long map(Long l) throws Exception {
                        this.count++;
                        if (this.count % 50 == 0) {
                            Thread.sleep(1000L);
                        }
                        return l;
                    }
                }).setParallelism(1);
                inputStreamWithEventTime = inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((l, j3) -> {
                    return l.longValue();
                }));
                return;
            }
            inputData.add(Long.valueOf(j2));
            j = j2 + 1;
        }
    }

    @Test
    public void testGlobalWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStream, GlobalWindows.getInstance(), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(new HashSet(inputData), new HashSet((Collection) list.get(0)));
    }

    @Test
    public void testCountTumblingWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStream, CountTumblingWindows.of(14L), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertEquals(7L, list.size());
        int i = 0;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            i += ((List) it.next()).size();
        }
        Assert.assertEquals(98L, i);
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStreamWithProcessingTimeGap, ProcessingTimeTumblingWindows.of(Time.milliseconds(100L)), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertTrue(list.size() > 1);
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next());
        }
        Assert.assertTrue(arrayList.containsAll(inputData.subList(0, 99)));
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStreamWithEventTime, EventTimeTumblingWindows.of(Time.milliseconds(14L)), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertEquals(8L, list.size());
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next());
        }
        Assert.assertEquals(100L, arrayList.size());
        Assert.assertEquals(new HashSet(inputData), new HashSet(arrayList));
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStreamWithProcessingTimeGap, ProcessingTimeSessionWindows.withGap(Time.milliseconds(100L)), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertTrue(list.size() > 1);
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next());
        }
        Assert.assertTrue(arrayList.containsAll(inputData.subList(0, 99)));
    }

    @Test
    public void testEventTimeSessionWindows() throws Exception {
        List list = IteratorUtils.toList(DataStreamUtils.windowAllAndProcess(inputStreamWithEventTime, EventTimeSessionWindows.withGap(Time.milliseconds(14L)), new CreateAllWindowBatchFunction(), Types.LIST(Types.LONG)).executeAndCollect());
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(new HashSet(inputData), new HashSet((Collection) list.get(0)));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -807995446:
                if (implMethodName.equals("lambda$beforeClass$caa42ba5$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/ml/common/window/WindowsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;J)J")) {
                    return (l, j3) -> {
                        return l.longValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
