/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.util.ArrayList;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;

public class SessionWindowing {
    private static boolean fileOutput = false;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!SessionWindowing.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        final ArrayList<Tuple3> input = new ArrayList<Tuple3>();
        input.add(new Tuple3((Object)"a", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)1L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)3L, (Object)1));
        input.add(new Tuple3((Object)"b", (Object)5L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)6L, (Object)1));
        input.add(new Tuple3((Object)"a", (Object)10L, (Object)1));
        input.add(new Tuple3((Object)"c", (Object)11L, (Object)1));
        DataStreamSource source = env.addSource((SourceFunction)new SourceFunction<Tuple3<String, Long, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
                for (Tuple3 value : input) {
                    ctx.collect((Object)value);
                    if (fileOutput) continue;
                    System.out.println("Collected: " + value);
                    Thread.sleep(3000L);
                }
            }

            public void cancel() {
            }
        });
        DataStream aggregated = source.groupBy(new int[]{0}).window((TriggerPolicy)new SessionTriggerPolicy(3L), (EvictionPolicy)new TumblingEvictionPolicy()).sum(2).flatten();
        if (fileOutput) {
            aggregated.writeAsText(outputPath);
        } else {
            aggregated.print();
        }
        env.execute();
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 1) {
                fileOutput = true;
                outputPath = args[0];
            } else {
                System.err.println("Usage: SessionWindowing <result path>");
                return false;
            }
        }
        return true;
    }

    private static class SessionTriggerPolicy
    implements CentralActiveTrigger<Tuple3<String, Long, Integer>> {
        private static final long serialVersionUID = 1L;
        private volatile Long lastSeenEvent = 1L;
        private Long sessionTimeout;

        public SessionTriggerPolicy(Long sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }

        public boolean notifyTrigger(Tuple3<String, Long, Integer> datapoint) {
            Long eventTimestamp = (Long)datapoint.f1;
            Long timeSinceLastEvent = eventTimestamp - this.lastSeenEvent;
            this.lastSeenEvent = eventTimestamp;
            return timeSinceLastEvent > this.sessionTimeout;
        }

        public Object[] notifyOnLastGlobalElement(Tuple3<String, Long, Integer> datapoint) {
            Long eventTimestamp = (Long)datapoint.f1;
            Long timeSinceLastEvent = eventTimestamp - this.lastSeenEvent;
            if (timeSinceLastEvent > this.sessionTimeout) {
                return new Object[]{datapoint};
            }
            return null;
        }

        public SessionTriggerPolicy clone() {
            return new SessionTriggerPolicy(this.sessionTimeout);
        }
    }
}

