/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class SavepointReaderKeyedStateITCase
extends AbstractTestBase {
    private static final String uid = "stateful-operator";
    private static ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor("value", Types.INT);

    @Test
    public void testKeyedInputFormat() throws Exception {
        this.runKeyedState((StateBackend)new MemoryStateBackend());
        miniClusterResource.after();
        miniClusterResource.before();
        this.runKeyedState((StateBackend)new RocksDBStateBackend((StateBackend)new MemoryStateBackend()));
    }

    private void runKeyedState(StateBackend backend) throws Exception {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setStateBackend(backend);
        streamEnv.setParallelism(4);
        streamEnv.addSource((SourceFunction)new SavepointSource()).rebalance().keyBy((KeySelector & Serializable)id -> id.key).process((KeyedProcessFunction)new KeyedStatefulOperator()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph();
        String path = this.takeSavepoint(jobGraph);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)path, (StateBackend)backend);
        List results = savepoint.readKeyedState(uid, (KeyedStateReaderFunction)new Reader()).collect();
        Set expected = SavepointSource.getElements();
        Assert.assertEquals((String)"Unexpected results from keyed state", (Object)expected, new HashSet(results));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String takeSavepoint(JobGraph jobGraph) throws Exception {
        SavepointSource.initializeForTest();
        ClusterClient client = miniClusterResource.getClusterClient();
        client.setDetached(true);
        JobID jobId = jobGraph.getJobID();
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        String dirPath = this.getTempDirPath(new AbstractID().toHexString());
        try {
            client.setDetached(true);
            JobSubmissionResult result = client.submitJob(jobGraph, ((Object)((Object)this)).getClass().getClassLoader());
            boolean finished = false;
            while (deadline.hasTimeLeft()) {
                if (!SavepointSource.isFinished()) continue;
                finished = true;
                break;
            }
            if (!finished) {
                Assert.fail((String)"Failed to initialize state within deadline");
            }
            CompletableFuture path = client.triggerSavepoint(result.getJobID(), dirPath);
            String string = (String)path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            return string;
        }
        finally {
            client.cancel(jobId);
        }
    }

    public static class Pojo {
        Integer key;
        Integer state;
        Set<Long> eventTimeTimer;
        Set<Long> processingTimeTimer;

        public static Pojo of(Integer key, Integer state) {
            Pojo wrapper = new Pojo();
            wrapper.key = key;
            wrapper.state = state;
            wrapper.eventTimeTimer = Collections.singleton(0x7FFFFFFFFFFFFFFEL);
            wrapper.processingTimeTimer = Collections.singleton(0x7FFFFFFFFFFFFFFDL);
            return wrapper;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Pojo pojo = (Pojo)o;
            return Objects.equals(this.key, pojo.key) && Objects.equals(this.state, pojo.state) && Objects.equals(this.eventTimeTimer, pojo.eventTimeTimer) && Objects.equals(this.processingTimeTimer, pojo.processingTimeTimer);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.state, this.eventTimeTimer, this.processingTimeTimer);
        }
    }

    private static class Reader
    extends KeyedStateReaderFunction<Integer, Pojo> {
        private transient ValueState<Integer> state;

        private Reader() {
        }

        public void open(Configuration parameters) {
            this.state = this.getRuntimeContext().getState(valueState);
        }

        public void readKey(Integer key, KeyedStateReaderFunction.Context ctx, Collector<Pojo> out) throws Exception {
            Pojo pojo = new Pojo();
            pojo.key = key;
            pojo.state = (Integer)this.state.value();
            pojo.eventTimeTimer = ctx.registeredEventTimeTimers();
            pojo.processingTimeTimer = ctx.registeredProcessingTimeTimers();
            out.collect((Object)pojo);
        }
    }

    private static class KeyedStatefulOperator
    extends KeyedProcessFunction<Integer, Pojo, Void> {
        private transient ValueState<Integer> state;

        private KeyedStatefulOperator() {
        }

        public void open(Configuration parameters) {
            this.state = this.getRuntimeContext().getState(valueState);
        }

        public void processElement(Pojo value, KeyedProcessFunction.Context ctx, Collector<Void> out) throws Exception {
            this.state.update((Object)value.state);
            value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer.longValue()));
            value.processingTimeTimer.forEach(timer -> ctx.timerService().registerProcessingTimeTimer(timer.longValue()));
        }
    }

    private static class SavepointSource
    implements SourceFunction<Pojo> {
        private static volatile boolean finished;
        private volatile boolean running = true;
        private static final Pojo[] elements;

        private SavepointSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Pojo> ctx) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (Pojo element : elements) {
                    ctx.collect((Object)element);
                }
                finished = true;
            }
            while (this.running) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.running = false;
        }

        private static void initializeForTest() {
            finished = false;
        }

        private static boolean isFinished() {
            return finished;
        }

        private static Set<Pojo> getElements() {
            return new HashSet<Pojo>(Arrays.asList(elements));
        }

        static {
            elements = new Pojo[]{Pojo.of(1, 1), Pojo.of(2, 2), Pojo.of(3, 3)};
        }
    }
}

