package org.apache.flink.streaming.examples.windowing;

import java.util.ArrayList;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

/* loaded from: input_file:org/apache/flink/streaming/examples/windowing/SessionWindowing.class */
public class SessionWindowing {

    /* loaded from: input_file:org/apache/flink/streaming/examples/windowing/SessionWindowing$SessionTrigger.class */
    private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
        private static final long serialVersionUID = 1;
        private final Long sessionTimeout;
        private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", Long.class, -1L);

        public SessionTrigger(Long l) {
            this.sessionTimeout = l;
        }

        public TriggerResult onElement(Tuple3<String, Long, Integer> tuple3, long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            ValueState partitionedState = triggerContext.getPartitionedState(this.stateDesc);
            Long l = (Long) partitionedState.value();
            Long valueOf = Long.valueOf(j - l.longValue());
            triggerContext.deleteEventTimeTimer(l.longValue() + this.sessionTimeout.longValue());
            partitionedState.update(Long.valueOf(j));
            triggerContext.registerEventTimeTimer(j + this.sessionTimeout.longValue());
            if (l.longValue() == -1 || valueOf.longValue() <= this.sessionTimeout.longValue()) {
                return TriggerResult.CONTINUE;
            }
            System.out.println("FIRING ON ELEMENT: " + tuple3 + " ts: " + j + " last " + l);
            return TriggerResult.FIRE_AND_PURGE;
        }

        public TriggerResult onEventTime(long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            Long l = (Long) triggerContext.getPartitionedState(this.stateDesc).value();
            if (j - l.longValue() < this.sessionTimeout.longValue()) {
                return TriggerResult.CONTINUE;
            }
            System.out.println("CTX: " + triggerContext + " Firing Time " + j + " last seen " + l);
            return TriggerResult.FIRE_AND_PURGE;
        }

        public TriggerResult onProcessingTime(long j, GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        public void clear(GlobalWindow globalWindow, Trigger.TriggerContext triggerContext) throws Exception {
            ValueState partitionedState = triggerContext.getPartitionedState(this.stateDesc);
            if (((Long) partitionedState.value()).longValue() != -1) {
                triggerContext.deleteEventTimeTimer(((Long) partitionedState.value()).longValue() + this.sessionTimeout.longValue());
            }
            partitionedState.clear();
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        System.out.println("Usage: SessionWindowing --output <path>");
        executionEnvironment.getConfig().setGlobalJobParameters(fromArgs);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        final boolean has = fromArgs.has("output");
        final ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple3("a", 1L, 1));
        arrayList.add(new Tuple3("b", 1L, 1));
        arrayList.add(new Tuple3("b", 3L, 1));
        arrayList.add(new Tuple3("b", 5L, 1));
        arrayList.add(new Tuple3("c", 6L, 1));
        arrayList.add(new Tuple3("a", 10L, 1));
        arrayList.add(new Tuple3("c", 11L, 1));
        SingleOutputStreamOperator sum = executionEnvironment.addSource(new SourceFunction<Tuple3<String, Long, Integer>>() { // from class: org.apache.flink.streaming.examples.windowing.SessionWindowing.1
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Tuple3<String, Long, Integer>> sourceContext) throws Exception {
                for (Tuple3 tuple3 : arrayList) {
                    sourceContext.collectWithTimestamp(tuple3, ((Long) tuple3.f1).longValue());
                    sourceContext.emitWatermark(new Watermark(((Long) tuple3.f1).longValue() - serialVersionUID));
                    if (!has) {
                        System.out.println("Collected: " + tuple3);
                    }
                }
                sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            }

            public void cancel() {
            }
        }).keyBy(new int[]{0}).window(GlobalWindows.create()).trigger(new SessionTrigger(3L)).sum(2);
        if (has) {
            sum.writeAsText(fromArgs.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            sum.print();
        }
        executionEnvironment.execute();
    }
}
