/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class SessionWindowing {
    private static boolean fileOutput = false;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!SessionWindowing.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(2);
        final ArrayList<Tuple3> input = new ArrayList<Tuple3>();
        input.add(new Tuple3((Object)"a", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)3L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)5L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)6L, (Object)1));
        input.add(new Tuple3((Object)"a", (Object)10L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)11L, (Object)1));
        DataStreamSource source = env.addSource((SourceFunction)new EventTimeSourceFunction<Tuple3<String, Long, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
                for (Tuple3 value : input) {
                    ctx.collectWithTimestamp((Object)value, ((Long)value.f1).longValue());
                    ctx.emitWatermark(new Watermark((Long)value.f1 - 1L));
                    if (fileOutput) continue;
                    System.out.println("Collected: " + value);
                }
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
            }

            public void cancel() {
            }
        });
        SingleOutputStreamOperator aggregated = source.keyBy(new int[]{0}).window((WindowAssigner)GlobalWindows.create()).trigger((Trigger)new SessionTrigger(3L)).sum(2);
        if (fileOutput) {
            aggregated.writeAsText(outputPath);
        } else {
            aggregated.print();
        }
        env.execute();
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 1) {
                fileOutput = true;
                outputPath = args[0];
            } else {
                System.err.println("Usage: SessionWindowing <result path>");
                return false;
            }
        }
        return true;
    }

    private static class SessionTrigger
    implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
        private static final long serialVersionUID = 1L;
        private final Long sessionTimeout;

        public SessionTrigger(Long sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }

        public Trigger.TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            OperatorState lastSeenState = ctx.getKeyValueState("last-seen", (Serializable)Long.valueOf(1L));
            Long lastSeen = (Long)lastSeenState.value();
            Long timeSinceLastEvent = timestamp - lastSeen;
            lastSeenState.update((Object)timestamp);
            ctx.registerEventTimeTimer(lastSeen + this.sessionTimeout);
            if (timeSinceLastEvent > this.sessionTimeout) {
                return Trigger.TriggerResult.FIRE_AND_PURGE;
            }
            return Trigger.TriggerResult.CONTINUE;
        }

        public Trigger.TriggerResult onEventTime(long time, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            OperatorState lastSeenState = ctx.getKeyValueState("last-seen", (Serializable)Long.valueOf(1L));
            Long lastSeen = (Long)lastSeenState.value();
            if (time - lastSeen >= this.sessionTimeout) {
                return Trigger.TriggerResult.FIRE_AND_PURGE;
            }
            return Trigger.TriggerResult.CONTINUE;
        }

        public Trigger.TriggerResult onProcessingTime(long time, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            return Trigger.TriggerResult.CONTINUE;
        }
    }
}

