package org.apache.flink.streaming.examples.windowing;

import java.util.ArrayList;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

/* loaded from: input_file:org/apache/flink/streaming/examples/windowing/SessionWindowing.class */
public class SessionWindowing {
    private static boolean fileOutput = false;
    private static String outputPath;

    /* loaded from: input_file:org/apache/flink/streaming/examples/windowing/SessionWindowing$SessionTrigger.class */
    private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
        private static final long serialVersionUID = 1;
        private final Long sessionTimeout;

        public SessionTrigger(Long l) {
            this.sessionTimeout = l;
        }

        public Trigger.TriggerResult onElement(Tuple3<String, Long, Integer> tuple3, long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            OperatorState keyValueState = triggerContext.getKeyValueState("last-seen", Long.valueOf(serialVersionUID));
            Long l = (Long) keyValueState.value();
            Long valueOf = Long.valueOf(j - l.longValue());
            keyValueState.update(Long.valueOf(j));
            triggerContext.registerEventTimeTimer(l.longValue() + this.sessionTimeout.longValue());
            return valueOf.longValue() > this.sessionTimeout.longValue() ? Trigger.TriggerResult.FIRE_AND_PURGE : Trigger.TriggerResult.CONTINUE;
        }

        public Trigger.TriggerResult onEventTime(long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            return j - ((Long) triggerContext.getKeyValueState("last-seen", Long.valueOf(serialVersionUID)).value()).longValue() >= this.sessionTimeout.longValue() ? Trigger.TriggerResult.FIRE_AND_PURGE : Trigger.TriggerResult.CONTINUE;
        }

        public Trigger.TriggerResult onProcessingTime(long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            return Trigger.TriggerResult.CONTINUE;
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (parseParameters(strArr)) {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            executionEnvironment.setParallelism(2);
            final ArrayList arrayList = new ArrayList();
            arrayList.add(new Tuple3("a", 1L, 1));
            arrayList.add(new Tuple3("b", 1L, 1));
            arrayList.add(new Tuple3("b", 3L, 1));
            arrayList.add(new Tuple3("b", 5L, 1));
            arrayList.add(new Tuple3("c", 6L, 1));
            arrayList.add(new Tuple3("a", 10L, 1));
            arrayList.add(new Tuple3("c", 11L, 1));
            SingleOutputStreamOperator sum = executionEnvironment.addSource(new EventTimeSourceFunction<Tuple3<String, Long, Integer>>() { // from class: org.apache.flink.streaming.examples.windowing.SessionWindowing.1
                private static final long serialVersionUID = 1;

                public void run(SourceFunction.SourceContext<Tuple3<String, Long, Integer>> sourceContext) throws Exception {
                    for (Tuple3 tuple3 : arrayList) {
                        sourceContext.collectWithTimestamp(tuple3, ((Long) tuple3.f1).longValue());
                        sourceContext.emitWatermark(new Watermark(((Long) tuple3.f1).longValue() - serialVersionUID));
                        if (!SessionWindowing.fileOutput) {
                            System.out.println("Collected: " + tuple3);
                        }
                    }
                    sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
                }

                public void cancel() {
                }
            }).keyBy(new int[]{0}).window(GlobalWindows.create()).trigger(new SessionTrigger(3L)).sum(2);
            if (fileOutput) {
                sum.writeAsText(outputPath);
            } else {
                sum.print();
            }
            executionEnvironment.execute();
        }
    }

    private static boolean parseParameters(String[] strArr) {
        if (strArr.length <= 0) {
            return true;
        }
        if (strArr.length != 1) {
            System.err.println("Usage: SessionWindowing <result path>");
            return false;
        }
        fileOutput = true;
        outputPath = strArr[0];
        return true;
    }
}
