/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.util.ArrayList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class SessionWindowing {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        System.out.println("Usage: SessionWindowing --output <path>");
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        final boolean fileOutput = params.has("output");
        final ArrayList<Tuple3> input = new ArrayList<Tuple3>();
        input.add(new Tuple3((Object)"a", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)3L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)5L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)6L, (Object)1));
        input.add(new Tuple3((Object)"a", (Object)10L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)11L, (Object)1));
        DataStreamSource source = env.addSource((SourceFunction)new SourceFunction<Tuple3<String, Long, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
                for (Tuple3 value : input) {
                    ctx.collectWithTimestamp((Object)value, ((Long)value.f1).longValue());
                    ctx.emitWatermark(new Watermark((Long)value.f1 - 1L));
                    if (fileOutput) continue;
                    System.out.println("Collected: " + value);
                }
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
            }

            public void cancel() {
            }
        });
        SingleOutputStreamOperator aggregated = source.keyBy(new int[]{0}).window((WindowAssigner)GlobalWindows.create()).trigger((Trigger)new SessionTrigger(3L)).sum(2);
        if (fileOutput) {
            aggregated.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            aggregated.print();
        }
        env.execute();
    }

    private static class SessionTrigger
    extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
        private static final long serialVersionUID = 1L;
        private final Long sessionTimeout;
        private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor("last-seen", Long.class, (Object)-1L);

        public SessionTrigger(Long sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }

        public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            ValueState lastSeenState = (ValueState)ctx.getPartitionedState(this.stateDesc);
            Long lastSeen = (Long)lastSeenState.value();
            Long timeSinceLastEvent = timestamp - lastSeen;
            ctx.deleteEventTimeTimer(lastSeen + this.sessionTimeout);
            lastSeenState.update((Object)timestamp);
            ctx.registerEventTimeTimer(timestamp + this.sessionTimeout);
            if (lastSeen != -1L && timeSinceLastEvent > this.sessionTimeout) {
                System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
                return TriggerResult.FIRE_AND_PURGE;
            }
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onEventTime(long time, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            ValueState lastSeenState = (ValueState)ctx.getPartitionedState(this.stateDesc);
            Long lastSeen = (Long)lastSeenState.value();
            if (time - lastSeen >= this.sessionTimeout) {
                System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
                return TriggerResult.FIRE_AND_PURGE;
            }
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onProcessingTime(long time, GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        public void clear(GlobalWindow window, Trigger.TriggerContext ctx) throws Exception {
            ValueState lastSeenState = (ValueState)ctx.getPartitionedState(this.stateDesc);
            if ((Long)lastSeenState.value() != -1L) {
                ctx.deleteEventTimeTimer((Long)lastSeenState.value() + this.sessionTimeout);
            }
            lastSeenState.clear();
        }
    }
}

