package org.apache.flink.test.windowing.sessionwindows;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.windowing.sessionwindows.SessionEventGeneratorImpl;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.class */
public class SessionWindowITCase extends AbstractTestBase {
    private static final long RANDOM_SEED = 1234567;
    private static final boolean OUTPUT_RESULTS_AS_STRING = false;
    private static final boolean PURGE_WINDOW_ON_FIRE = false;
    private static final long NUMBER_OF_SESSIONS = 20000;
    private static final long MAX_SESSION_EVENT_GAP_MS = 1000;
    private static final long ALLOWED_LATENESS_MS = 500;
    private static final long MAX_ADDITIONAL_SESSION_GAP_MS = 5000;
    private static final int EVENTS_PER_SESSION = 10;
    private static final int LATE_EVENTS_PER_SESSION = 5;
    private static final int MAX_DROPPED_EVENTS_PER_SESSION = 5;
    private static final int NUMBER_OF_DIFFERENT_KEYS = 20;
    private static final int PARALLEL_SESSIONS = 10;
    private static final String SESSION_COUNTER_ON_TIME_KEY = "ALL_SESSIONS_ON_TIME_COUNT";
    private static final String SESSION_COUNTER_LATE_KEY = "ALL_SESSIONS_LATE_COUNT";

    /* loaded from: input_file:org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase$SessionEventGeneratorDataSource.class */
    private static final class SessionEventGeneratorDataSource implements SourceFunction<SessionEvent<Integer, TestEventPayload>> {
        static final long serialVersionUID = 11341498979L;
        private volatile boolean isRunning = false;

        public void run(SourceFunction.SourceContext<SessionEvent<Integer, TestEventPayload>> sourceContext) {
            ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> createGenerator = createGenerator();
            this.isRunning = true;
            while (this.isRunning) {
                synchronized (sourceContext.getCheckpointLock()) {
                    SessionEvent<Integer, TestEventPayload> nextEvent = createGenerator.nextEvent();
                    if (nextEvent == null) {
                        return;
                    }
                    sourceContext.collectWithTimestamp(nextEvent, nextEvent.getEventTimestamp());
                    sourceContext.emitWatermark(new Watermark(createGenerator.getWatermark()));
                }
            }
        }

        private ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> createGenerator() {
            LongRandomGenerator longRandomGenerator = new LongRandomGenerator(SessionWindowITCase.RANDOM_SEED);
            HashSet hashSet = new HashSet();
            for (int i = 0; i < SessionWindowITCase.NUMBER_OF_DIFFERENT_KEYS; i++) {
                hashSet.add(Integer.valueOf(i));
            }
            return new ParallelSessionsEventGenerator<>(hashSet, new EventGeneratorFactory(GeneratorConfiguration.of(SessionWindowITCase.ALLOWED_LATENESS_MS, 5, 5, SessionWindowITCase.MAX_ADDITIONAL_SESSION_GAP_MS), new GeneratorEventFactory<Integer, SessionEvent<Integer, TestEventPayload>>() { // from class: org.apache.flink.test.windowing.sessionwindows.SessionWindowITCase.SessionEventGeneratorDataSource.1
                @Override // org.apache.flink.test.windowing.sessionwindows.GeneratorEventFactory
                public SessionEvent<Integer, TestEventPayload> createEvent(Integer num, int i2, int i3, long j, long j2, SessionEventGeneratorImpl.Timing timing) {
                    return SessionEvent.of(num, TestEventPayload.of(j2, i2, i3, timing), j);
                }
            }, SessionWindowITCase.MAX_SESSION_EVENT_GAP_MS, 10, longRandomGenerator), 10, SessionWindowITCase.NUMBER_OF_SESSIONS, longRandomGenerator);
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase$ValidatingWindowFunction.class */
    private static final class ValidatingWindowFunction extends RichWindowFunction<SessionEvent<Integer, TestEventPayload>, String, Tuple, TimeWindow> {
        static final long serialVersionUID = 865723993979L;

        private ValidatingWindowFunction() {
        }

        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<SessionEvent<Integer, TestEventPayload>> iterable, Collector<String> collector) throws Exception {
            ArrayList<SessionEvent> arrayList = new ArrayList();
            Iterator<SessionEvent<Integer, TestEventPayload>> it = iterable.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            BitSet bitSet = new BitSet(10);
            BitSet bitSet2 = new BitSet(5);
            int i = 0;
            int i2 = 0;
            for (SessionEvent sessionEvent : arrayList) {
                if (SessionEventGeneratorImpl.Timing.TIMELY.equals(((TestEventPayload) sessionEvent.getEventValue()).getTiming())) {
                    i++;
                    bitSet.set(((TestEventPayload) sessionEvent.getEventValue()).getEventId());
                } else if (SessionEventGeneratorImpl.Timing.IN_LATENESS.equals(((TestEventPayload) sessionEvent.getEventValue()).getTiming())) {
                    i2++;
                    bitSet2.set(((TestEventPayload) sessionEvent.getEventValue()).getEventId() - 10);
                } else {
                    Assert.fail("Illegal event type in window " + timeWindow + ": " + sessionEvent);
                }
            }
            getRuntimeContext().getLongCounter(SessionWindowITCase.SESSION_COUNTER_ON_TIME_KEY).add(i);
            getRuntimeContext().getLongCounter(SessionWindowITCase.SESSION_COUNTER_LATE_KEY).add(i2);
            if (arrayList.size() < 10) {
                Assert.fail("Event count for session window " + timeWindow + " is too low: " + arrayList);
                return;
            }
            Assert.assertEquals(i, 10L);
            Assert.assertEquals(bitSet.cardinality(), i);
            Assert.assertEquals(bitSet2.cardinality(), i2);
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Tuple) obj, (TimeWindow) window, (Iterable<SessionEvent<Integer, TestEventPayload>>) iterable, (Collector<String>) collector);
        }
    }

    @Test
    public void testSessionWindowing() throws Exception {
        runTest(new SessionEventGeneratorDataSource(), new ValidatingWindowFunction());
    }

    private void runTest(SourceFunction<SessionEvent<Integer, TestEventPayload>> sourceFunction, WindowFunction<SessionEvent<Integer, TestEventPayload>, String, Tuple, TimeWindow> windowFunction) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(sourceFunction).keyBy(new String[]{"sessionKey"}).window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS))).allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS)).apply(windowFunction).print();
        JobExecutionResult execute = executionEnvironment.execute();
        Assert.assertEquals(1200000L, ((Long) execute.getAccumulatorResult(SESSION_COUNTER_ON_TIME_KEY)).longValue());
        Assert.assertEquals(300000L, ((Long) execute.getAccumulatorResult(SESSION_COUNTER_LATE_KEY)).longValue());
    }
}
