package org.apache.flink.state.api.input;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest.class */
public class KeyedStateInputFormatTest {
    private static ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$DoubleReaderFunction.class */
    static class DoubleReaderFunction extends KeyedStateReaderFunction<Integer, Integer> {
        ValueState<Integer> state;

        DoubleReaderFunction() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void readKey(Integer num, KeyedStateReaderFunction.Context context, Collector<Integer> collector) throws Exception {
            collector.collect(this.state.value());
            collector.collect(this.state.value());
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((Integer) obj, context, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$InvalidReaderFunction.class */
    static class InvalidReaderFunction extends KeyedStateReaderFunction<Integer, Integer> {
        InvalidReaderFunction() {
        }

        public void open(Configuration configuration) {
            getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void readKey(Integer num, KeyedStateReaderFunction.Context context, Collector<Integer> collector) throws Exception {
            collector.collect(getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor).value());
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((Integer) obj, context, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$ReaderFunction.class */
    static class ReaderFunction extends KeyedStateReaderFunction<Integer, Integer> {
        ValueState<Integer> state;

        ReaderFunction() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void readKey(Integer num, KeyedStateReaderFunction.Context context, Collector<Integer> collector) throws Exception {
            collector.collect(this.state.value());
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((Integer) obj, context, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$StatefulFunction.class */
    static class StatefulFunction extends RichFlatMapFunction<Integer, Void> {
        ValueState<Integer> state;

        StatefulFunction() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void flatMap(Integer num, Collector<Void> collector) throws Exception {
            this.state.update(num);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Integer) obj, (Collector<Void>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$StatefulFunctionWithTime.class */
    static class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Integer, Void> {
        ValueState<Integer> state;

        StatefulFunctionWithTime() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Void>.Context context, Collector<Void> collector) throws Exception {
            this.state.update(num);
            context.timerService().registerEventTimeTimer(num.intValue());
            context.timerService().registerProcessingTimeTimer(num.intValue());
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Void>.Context) context, (Collector<Void>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormatTest$TimeReaderFunction.class */
    static class TimeReaderFunction extends KeyedStateReaderFunction<Integer, Integer> {
        ValueState<Integer> state;

        TimeReaderFunction() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(KeyedStateInputFormatTest.stateDescriptor);
        }

        public void readKey(Integer num, KeyedStateReaderFunction.Context context, Collector<Integer> collector) throws Exception {
            Set registeredEventTimeTimers = context.registeredEventTimeTimers();
            Assert.assertEquals("Each key should have exactly one event timer for key " + num, 1L, registeredEventTimeTimers.size());
            collector.collect(Integer.valueOf(((Long) registeredEventTimeTimers.iterator().next()).intValue()));
            Set registeredProcessingTimeTimers = context.registeredProcessingTimeTimers();
            Assert.assertEquals("Each key should have exactly one processing timer for key " + num, 1L, registeredProcessingTimeTimers.size());
            collector.collect(Integer.valueOf(((Long) registeredProcessingTimeTimers.iterator().next()).intValue()));
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((Integer) obj, context, (Collector<Integer>) collector);
        }
    }

    @Test
    public void testCreatePartitionedInputSplits() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new StreamFlatMap(new StatefulFunction()));
        new OperatorState(fromUid, 1, 128).putState(0, createOperatorSubtaskState);
        Assert.assertEquals("Failed to properly partition operator state into input splits", 4L, new KeyedStateInputFormat(r0, new MemoryStateBackend(), Types.INT, new ReaderFunction()).createInputSplits(4).length);
    }

    @Test
    public void testMaxParallelismRespected() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new StreamFlatMap(new StatefulFunction()));
        new OperatorState(fromUid, 1, 128).putState(0, createOperatorSubtaskState);
        Assert.assertEquals("Failed to properly partition operator state into input splits", 128L, new KeyedStateInputFormat(r0, new MemoryStateBackend(), Types.INT, new ReaderFunction()).createInputSplits(129).length);
    }

    @Test
    public void testReadState() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new StreamFlatMap(new StatefulFunction()));
        OperatorState operatorState = new OperatorState(fromUid, 1, 128);
        operatorState.putState(0, createOperatorSubtaskState);
        Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 2, 3), readInputSplit(new KeyedStateInputFormat(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction()).createInputSplits(1)[0], new ReaderFunction()));
    }

    @Test
    public void testReadMultipleOutputPerKey() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new StreamFlatMap(new StatefulFunction()));
        OperatorState operatorState = new OperatorState(fromUid, 1, 128);
        operatorState.putState(0, createOperatorSubtaskState);
        Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 1, 2, 2, 3, 3), readInputSplit(new KeyedStateInputFormat(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction()).createInputSplits(1)[0], new DoubleReaderFunction()));
    }

    @Test(expected = IOException.class)
    public void testInvalidProcessReaderFunctionFails() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new StreamFlatMap(new StatefulFunction()));
        OperatorState operatorState = new OperatorState(fromUid, 1, 128);
        operatorState.putState(0, createOperatorSubtaskState);
        readInputSplit(new KeyedStateInputFormat(operatorState, new MemoryStateBackend(), Types.INT, new ReaderFunction()).createInputSplits(1)[0], new InvalidReaderFunction());
        Assert.fail("KeyedStateReaderFunction did not fail on invalid RuntimeContext use");
    }

    @Test
    public void testReadTime() throws Exception {
        OperatorID fromUid = OperatorIDGenerator.fromUid("uid");
        OperatorSubtaskState createOperatorSubtaskState = createOperatorSubtaskState(new KeyedProcessOperator(new StatefulFunctionWithTime()));
        OperatorState operatorState = new OperatorState(fromUid, 1, 128);
        operatorState.putState(0, createOperatorSubtaskState);
        Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 1, 2, 2, 3, 3), readInputSplit(new KeyedStateInputFormat(operatorState, new MemoryStateBackend(), Types.INT, new TimeReaderFunction()).createInputSplits(1)[0], new TimeReaderFunction()));
    }

    @Nonnull
    private List<Integer> readInputSplit(KeyGroupRangeInputSplit keyGroupRangeInputSplit, KeyedStateReaderFunction<Integer, Integer> keyedStateReaderFunction) throws IOException {
        KeyedStateInputFormat keyedStateInputFormat = new KeyedStateInputFormat(new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4), new MemoryStateBackend(), Types.INT, keyedStateReaderFunction);
        ArrayList arrayList = new ArrayList();
        keyedStateInputFormat.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
        keyedStateInputFormat.openInputFormat();
        keyedStateInputFormat.open(keyGroupRangeInputSplit);
        while (!keyedStateInputFormat.reachedEnd()) {
            arrayList.add(keyedStateInputFormat.nextRecord(0));
        }
        keyedStateInputFormat.close();
        keyedStateInputFormat.closeInputFormat();
        arrayList.sort(Comparator.comparingInt(num -> {
            return num.intValue();
        }));
        return arrayList;
    }

    private OperatorSubtaskState createOperatorSubtaskState(OneInputStreamOperator<Integer, Void> oneInputStreamOperator) throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(oneInputStreamOperator, num -> {
            return num;
        }, Types.INT, 128, 1, 0);
        Throwable th = null;
        try {
            try {
                keyedOneInputStreamOperatorTestHarness.setup(VoidSerializer.INSTANCE);
                keyedOneInputStreamOperatorTestHarness.open();
                keyedOneInputStreamOperatorTestHarness.processElement(1, 0L);
                keyedOneInputStreamOperatorTestHarness.processElement(2, 0L);
                keyedOneInputStreamOperatorTestHarness.processElement(3, 0L);
                OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
                if (keyedOneInputStreamOperatorTestHarness != null) {
                    if (0 != 0) {
                        try {
                            keyedOneInputStreamOperatorTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        keyedOneInputStreamOperatorTestHarness.close();
                    }
                }
                return snapshot;
            } finally {
            }
        } catch (Throwable th3) {
            if (keyedOneInputStreamOperatorTestHarness != null) {
                if (th != null) {
                    try {
                        keyedOneInputStreamOperatorTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    keyedOneInputStreamOperatorTestHarness.close();
                }
            }
            throw th3;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1788633092:
                if (implMethodName.equals("lambda$createOperatorSubtaskState$d4b2035$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/input/KeyedStateInputFormatTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
