/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.windowing.sessionwindows;

import java.time.Duration;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.test.windowing.sessionwindows.EventGeneratorFactory;
import org.apache.flink.test.windowing.sessionwindows.GeneratorConfiguration;
import org.apache.flink.test.windowing.sessionwindows.GeneratorEventFactory;
import org.apache.flink.test.windowing.sessionwindows.LongRandomGenerator;
import org.apache.flink.test.windowing.sessionwindows.ParallelSessionsEventGenerator;
import org.apache.flink.test.windowing.sessionwindows.SessionEvent;
import org.apache.flink.test.windowing.sessionwindows.SessionEventGeneratorImpl;
import org.apache.flink.test.windowing.sessionwindows.TestEventPayload;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class SessionWindowITCase
extends AbstractTestBaseJUnit4 {
    private static final long RANDOM_SEED = 1234567L;
    private static final boolean OUTPUT_RESULTS_AS_STRING = false;
    private static final boolean PURGE_WINDOW_ON_FIRE = false;
    private static final long NUMBER_OF_SESSIONS = 20000L;
    private static final long MAX_SESSION_EVENT_GAP_MS = 1000L;
    private static final long ALLOWED_LATENESS_MS = 500L;
    private static final long MAX_ADDITIONAL_SESSION_GAP_MS = 5000L;
    private static final int EVENTS_PER_SESSION = 10;
    private static final int LATE_EVENTS_PER_SESSION = 5;
    private static final int MAX_DROPPED_EVENTS_PER_SESSION = 5;
    private static final int NUMBER_OF_DIFFERENT_KEYS = 20;
    private static final int PARALLEL_SESSIONS = 10;
    private static final String SESSION_COUNTER_ON_TIME_KEY = "ALL_SESSIONS_ON_TIME_COUNT";
    private static final String SESSION_COUNTER_LATE_KEY = "ALL_SESSIONS_LATE_COUNT";

    @Test
    public void testSessionWindowing() throws Exception {
        SessionEventGeneratorDataSource dataSource = new SessionEventGeneratorDataSource();
        this.runTest(dataSource, (WindowFunction<SessionEvent<Integer, TestEventPayload>, String, Integer, TimeWindow>)new ValidatingWindowFunction());
    }

    private void runTest(SourceFunction<SessionEvent<Integer, TestEventPayload>> dataSource, WindowFunction<SessionEvent<Integer, TestEventPayload>, String, Integer, TimeWindow> windowFunction) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WindowedStream windowedStream = env.addSource(dataSource).keyBy(SessionEvent::getSessionKey, Types.INT).window((WindowAssigner)EventTimeSessionWindows.withGap((Duration)Duration.ofMillis(1000L)));
        windowedStream = windowedStream.allowedLateness(Duration.ofMillis(500L));
        windowedStream.apply(windowFunction).print();
        JobExecutionResult result = env.execute();
        Assert.assertEquals((long)1200000L, (long)((Long)result.getAccumulatorResult(SESSION_COUNTER_ON_TIME_KEY)));
        Assert.assertEquals((long)300000L, (long)((Long)result.getAccumulatorResult(SESSION_COUNTER_LATE_KEY)));
    }

    private static final class SessionEventGeneratorDataSource
    implements SourceFunction<SessionEvent<Integer, TestEventPayload>> {
        static final long serialVersionUID = 11341498979L;
        private volatile boolean isRunning = false;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<SessionEvent<Integer, TestEventPayload>> ctx) {
            ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> generator = this.createGenerator();
            this.isRunning = true;
            while (this.isRunning) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    SessionEvent<Integer, TestEventPayload> evt = generator.nextEvent();
                    if (evt == null) {
                        break;
                    }
                    ctx.collectWithTimestamp(evt, evt.getEventTimestamp());
                    ctx.emitWatermark(new Watermark(generator.getWatermark()));
                }
            }
        }

        private ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> createGenerator() {
            LongRandomGenerator randomGenerator = new LongRandomGenerator(1234567L);
            HashSet<Integer> keys = new HashSet<Integer>();
            for (int i = 0; i < 20; ++i) {
                keys.add(i);
            }
            GeneratorConfiguration generatorConfiguration = GeneratorConfiguration.of(500L, 5, 5, 5000L);
            GeneratorEventFactory<Integer, SessionEvent<Integer, TestEventPayload>> generatorEventFactory = new GeneratorEventFactory<Integer, SessionEvent<Integer, TestEventPayload>>(){

                @Override
                public SessionEvent<Integer, TestEventPayload> createEvent(Integer key, int sessionId, int eventId, long eventTimestamp, long globalWatermark, SessionEventGeneratorImpl.Timing timing) {
                    return SessionEvent.of(key, TestEventPayload.of(globalWatermark, sessionId, eventId, timing), eventTimestamp);
                }
            };
            EventGeneratorFactory<Integer, SessionEvent<Integer, TestEventPayload>> eventGeneratorFactory = new EventGeneratorFactory<Integer, SessionEvent<Integer, TestEventPayload>>(generatorConfiguration, generatorEventFactory, 1000L, 10, randomGenerator);
            return new ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>>(keys, eventGeneratorFactory, 10, 20000L, randomGenerator);
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    private static final class ValidatingWindowFunction
    extends RichWindowFunction<SessionEvent<Integer, TestEventPayload>, String, Integer, TimeWindow> {
        static final long serialVersionUID = 865723993979L;

        private ValidatingWindowFunction() {
        }

        public void apply(Integer i, TimeWindow timeWindow, Iterable<SessionEvent<Integer, TestEventPayload>> input, Collector<String> output) throws Exception {
            ArrayList<SessionEvent<Integer, TestEventPayload>> sessionEvents = new ArrayList<SessionEvent<Integer, TestEventPayload>>();
            for (SessionEvent<Integer, TestEventPayload> evt : input) {
                sessionEvents.add(evt);
            }
            BitSet onTimeBits = new BitSet(10);
            BitSet lateWithingBits = new BitSet(5);
            int onTimeCount = 0;
            int lateCount = 0;
            for (SessionEvent sessionEvent : sessionEvents) {
                if (SessionEventGeneratorImpl.Timing.TIMELY.equals((Object)((TestEventPayload)sessionEvent.getEventValue()).getTiming())) {
                    ++onTimeCount;
                    onTimeBits.set(((TestEventPayload)sessionEvent.getEventValue()).getEventId());
                    continue;
                }
                if (SessionEventGeneratorImpl.Timing.IN_LATENESS.equals((Object)((TestEventPayload)sessionEvent.getEventValue()).getTiming())) {
                    ++lateCount;
                    lateWithingBits.set(((TestEventPayload)sessionEvent.getEventValue()).getEventId() - 10);
                    continue;
                }
                Assert.fail((String)("Illegal event type in window " + timeWindow + ": " + sessionEvent));
            }
            this.getRuntimeContext().getLongCounter(SessionWindowITCase.SESSION_COUNTER_ON_TIME_KEY).add((long)onTimeCount);
            this.getRuntimeContext().getLongCounter(SessionWindowITCase.SESSION_COUNTER_LATE_KEY).add((long)lateCount);
            if (sessionEvents.size() >= 10) {
                Assert.assertEquals((long)onTimeCount, (long)10L);
                Assert.assertEquals((long)onTimeBits.cardinality(), (long)onTimeCount);
                Assert.assertEquals((long)lateWithingBits.cardinality(), (long)lateCount);
            } else {
                Assert.fail((String)("Event count for session window " + timeWindow + " is too low: " + sessionEvents));
            }
        }
    }
}

