package org.apache.flink.state.api;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/SavepointReaderKeyedStateITCase.class */
public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
    private static final String uid = "stateful-operator";
    private static ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor<>("value", Types.INT);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/SavepointReaderKeyedStateITCase$KeyedStatefulOperator.class */
    public static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void> {
        private transient ValueState<Integer> state;

        private KeyedStatefulOperator() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(SavepointReaderKeyedStateITCase.valueState);
        }

        public void processElement(Pojo pojo, KeyedProcessFunction<Integer, Pojo, Void>.Context context, Collector<Void> collector) throws Exception {
            this.state.update(pojo.state);
            pojo.eventTimeTimer.forEach(l -> {
                context.timerService().registerEventTimeTimer(l.longValue());
            });
            pojo.processingTimeTimer.forEach(l2 -> {
                context.timerService().registerProcessingTimeTimer(l2.longValue());
            });
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Pojo) obj, (KeyedProcessFunction<Integer, Pojo, Void>.Context) context, (Collector<Void>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointReaderKeyedStateITCase$Pojo.class */
    public static class Pojo {
        Integer key;
        Integer state;
        Set<Long> eventTimeTimer;
        Set<Long> processingTimeTimer;

        public static Pojo of(Integer num, Integer num2) {
            Pojo pojo = new Pojo();
            pojo.key = num;
            pojo.state = num2;
            pojo.eventTimeTimer = Collections.singleton(9223372036854775806L);
            pojo.processingTimeTimer = Collections.singleton(9223372036854775805L);
            return pojo;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Pojo pojo = (Pojo) obj;
            return Objects.equals(this.key, pojo.key) && Objects.equals(this.state, pojo.state) && Objects.equals(this.eventTimeTimer, pojo.eventTimeTimer) && Objects.equals(this.processingTimeTimer, pojo.processingTimeTimer);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.state, this.eventTimeTimer, this.processingTimeTimer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/SavepointReaderKeyedStateITCase$Reader.class */
    public static class Reader extends KeyedStateReaderFunction<Integer, Pojo> {
        private transient ValueState<Integer> state;

        private Reader() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(SavepointReaderKeyedStateITCase.valueState);
        }

        public void readKey(Integer num, KeyedStateReaderFunction.Context context, Collector<Pojo> collector) throws Exception {
            Pojo pojo = new Pojo();
            pojo.key = num;
            pojo.state = (Integer) this.state.value();
            pojo.eventTimeTimer = context.registeredEventTimeTimers();
            pojo.processingTimeTimer = context.registeredProcessingTimeTimers();
            collector.collect(pojo);
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((Integer) obj, context, (Collector<Pojo>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/SavepointReaderKeyedStateITCase$SavepointSource.class */
    public static class SavepointSource implements SourceFunction<Pojo> {
        private static volatile boolean finished;
        private volatile boolean running;
        private static final Pojo[] elements = {Pojo.of(1, 1), Pojo.of(2, 2), Pojo.of(3, 3)};

        private SavepointSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Pojo> sourceContext) {
            synchronized (sourceContext.getCheckpointLock()) {
                for (Pojo pojo : elements) {
                    sourceContext.collect(pojo);
                }
                finished = true;
            }
            while (this.running) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void initializeForTest() {
            finished = false;
        }

        private static boolean isFinished() {
            return finished;
        }

        private static Set<Pojo> getElements() {
            return new HashSet(Arrays.asList(elements));
        }

        static /* synthetic */ Set access$300() {
            return getElements();
        }

        static /* synthetic */ boolean access$500() {
            return isFinished();
        }
    }

    @Test
    public void testKeyedInputFormat() throws Exception {
        runKeyedState(new MemoryStateBackend());
        miniClusterResource.after();
        miniClusterResource.before();
        runKeyedState(new RocksDBStateBackend(new MemoryStateBackend()));
    }

    private void runKeyedState(StateBackend stateBackend) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(stateBackend);
        executionEnvironment.setParallelism(4);
        executionEnvironment.addSource(new SavepointSource()).rebalance().keyBy(pojo -> {
            return pojo.key;
        }).process(new KeyedStatefulOperator()).uid(uid).addSink(new DiscardingSink());
        Assert.assertEquals("Unexpected results from keyed state", SavepointSource.access$300(), new HashSet(Savepoint.load(ExecutionEnvironment.getExecutionEnvironment(), takeSavepoint(executionEnvironment.getStreamGraph().getJobGraph()), stateBackend).readKeyedState(uid, new Reader()).collect()));
    }

    private String takeSavepoint(JobGraph jobGraph) throws Exception {
        SavepointSource.initializeForTest();
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        clusterClient.setDetached(true);
        JobID jobID = jobGraph.getJobID();
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(5L));
        String tempDirPath = getTempDirPath(new AbstractID().toHexString());
        try {
            clusterClient.setDetached(true);
            JobSubmissionResult submitJob = clusterClient.submitJob(jobGraph, getClass().getClassLoader());
            boolean z = false;
            while (true) {
                if (!fromNow.hasTimeLeft()) {
                    break;
                }
                if (SavepointSource.access$500()) {
                    z = true;
                    break;
                }
            }
            if (!z) {
                Assert.fail("Failed to initialize state within deadline");
            }
            String str = (String) clusterClient.triggerSavepoint(submitJob.getJobID(), tempDirPath).get(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            clusterClient.cancel(jobID);
            return str;
        } catch (Throwable th) {
            clusterClient.cancel(jobID);
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1941559586:
                if (implMethodName.equals("lambda$runKeyedState$c1b5e134$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointReaderKeyedStateITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/state/api/SavepointReaderKeyedStateITCase$Pojo;)Ljava/lang/Integer;")) {
                    return pojo -> {
                        return pojo.key;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
