/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.utils.SavepointTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public abstract class SavepointReaderKeyedStateITCase<B extends StateBackend>
extends SavepointTestBase {
    private static final String uid = "stateful-operator";
    private static ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor("value", Types.INT);
    private static final List<Pojo> elements = Arrays.asList(Pojo.of(1, 1), Pojo.of(2, 2), Pojo.of(3, 3));

    protected abstract B getStateBackend();

    @Test
    public void testUserKeyedStateReader() throws Exception {
        String savepointPath = this.takeSavepoint(elements, (SourceFunction<T> source) -> {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStateBackend(this.getStateBackend());
            env.setParallelism(4);
            env.addSource(source).rebalance().keyBy((KeySelector & Serializable)id -> id.key).process((KeyedProcessFunction)new KeyedStatefulOperator()).uid(uid).addSink((SinkFunction)new DiscardingSink());
            return env;
        });
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.readKeyedState(uid, (KeyedStateReaderFunction)new Reader()).collect();
        HashSet<Pojo> expected = new HashSet<Pojo>(elements);
        Assert.assertEquals((String)"Unexpected results from keyed state", expected, new HashSet(results));
    }

    public static class Pojo {
        Integer key;
        Integer state;
        Set<Long> eventTimeTimer;
        Set<Long> processingTimeTimer;

        public static Pojo of(Integer key, Integer state) {
            Pojo wrapper = new Pojo();
            wrapper.key = key;
            wrapper.state = state;
            wrapper.eventTimeTimer = Collections.singleton(0x7FFFFFFFFFFFFFFEL);
            wrapper.processingTimeTimer = Collections.singleton(0x7FFFFFFFFFFFFFFDL);
            return wrapper;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Pojo pojo = (Pojo)o;
            return Objects.equals(this.key, pojo.key) && Objects.equals(this.state, pojo.state) && Objects.equals(this.eventTimeTimer, pojo.eventTimeTimer) && Objects.equals(this.processingTimeTimer, pojo.processingTimeTimer);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.state, this.eventTimeTimer, this.processingTimeTimer);
        }
    }

    private static class Reader
    extends KeyedStateReaderFunction<Integer, Pojo> {
        private transient ValueState<Integer> state;

        private Reader() {
        }

        public void open(Configuration parameters) {
            this.state = this.getRuntimeContext().getState(valueState);
        }

        public void readKey(Integer key, KeyedStateReaderFunction.Context ctx, Collector<Pojo> out) throws Exception {
            Pojo pojo = new Pojo();
            pojo.key = key;
            pojo.state = (Integer)this.state.value();
            pojo.eventTimeTimer = ctx.registeredEventTimeTimers();
            pojo.processingTimeTimer = ctx.registeredProcessingTimeTimers();
            out.collect((Object)pojo);
        }
    }

    private static class KeyedStatefulOperator
    extends KeyedProcessFunction<Integer, Pojo, Void> {
        private transient ValueState<Integer> state;

        private KeyedStatefulOperator() {
        }

        public void open(Configuration parameters) {
            this.state = this.getRuntimeContext().getState(valueState);
        }

        public void processElement(Pojo value, KeyedProcessFunction.Context ctx, Collector<Void> out) throws Exception {
            this.state.update((Object)value.state);
            value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer.longValue()));
            value.processingTimeTimer.forEach(timer -> ctx.timerService().registerProcessingTimeTimer(timer.longValue()));
        }
    }
}

