package org.apache.flink.state.api.output;

import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.class */
public class KeyedStateBootstrapOperatorTest {
    private static final ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("state", Types.LONG);
    private static final Long EVENT_TIMER = 9223372036854775806L;
    private static final Long PROC_TIMER = 9223372036854775805L;

    @Rule
    public TemporaryFolder folder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest$SimpleBootstrapFunction.class */
    private static class SimpleBootstrapFunction extends KeyedStateBootstrapFunction<Long, Long> {
        private ValueState<Long> state;

        private SimpleBootstrapFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            this.state = getRuntimeContext().getState(KeyedStateBootstrapOperatorTest.descriptor);
        }

        public void processElement(Long l, KeyedStateBootstrapFunction<Long, Long>.Context context) throws Exception {
            this.state.update(l);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedStateBootstrapFunction.Context context) throws Exception {
            processElement((Long) obj, (KeyedStateBootstrapFunction<Long, Long>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest$SimpleProcessFunction.class */
    private static class SimpleProcessFunction extends KeyedProcessFunction<Long, Long, Tuple3<Long, Long, TimeDomain>> {
        private SimpleProcessFunction() {
        }

        public void processElement(Long l, KeyedProcessFunction<Long, Long, Tuple3<Long, Long, TimeDomain>>.Context context, Collector<Tuple3<Long, Long, TimeDomain>> collector) throws Exception {
        }

        public void onTimer(long j, KeyedProcessFunction<Long, Long, Tuple3<Long, Long, TimeDomain>>.OnTimerContext onTimerContext, Collector<Tuple3<Long, Long, TimeDomain>> collector) throws Exception {
            collector.collect(Tuple3.of(onTimerContext.getCurrentKey(), Long.valueOf(j), onTimerContext.timeDomain()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Long) obj, (KeyedProcessFunction<Long, Long, Tuple3<Long, Long, TimeDomain>>.Context) context, (Collector<Tuple3<Long, Long, TimeDomain>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest$StreamingFunction.class */
    private static class StreamingFunction extends RichMapFunction<Long, Long> {
        private ValueState<Long> state;

        private StreamingFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            this.state = getRuntimeContext().getState(KeyedStateBootstrapOperatorTest.descriptor);
        }

        public Long map(Long l) throws Exception {
            return (Long) this.state.value();
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest$TimerBootstrapFunction.class */
    private static class TimerBootstrapFunction extends KeyedStateBootstrapFunction<Long, Long> {
        private TimerBootstrapFunction() {
        }

        public void processElement(Long l, KeyedStateBootstrapFunction<Long, Long>.Context context) throws Exception {
            context.timerService().registerEventTimeTimer(KeyedStateBootstrapOperatorTest.EVENT_TIMER.longValue());
            context.timerService().registerProcessingTimeTimer(KeyedStateBootstrapOperatorTest.PROC_TIMER.longValue());
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedStateBootstrapFunction.Context context) throws Exception {
            processElement((Long) obj, (KeyedStateBootstrapFunction<Long, Long>.Context) context);
        }
    }

    @Test
    public void testTimerStateRestorable() throws Exception {
        KeyedOneInputStreamOperatorTestHarness harness;
        Throwable th;
        KeyedStateBootstrapOperator<Long, Long> keyedStateBootstrapOperator = new KeyedStateBootstrapOperator<>(0L, new Path(this.folder.newFolder().toURI()), new TimerBootstrapFunction());
        KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState> harness2 = getHarness(keyedStateBootstrapOperator);
        Throwable th2 = null;
        try {
            try {
                processElements(harness2, 1L, 2L, 3L);
                OperatorSubtaskState state = getState(keyedStateBootstrapOperator, harness2);
                if (harness2 != null) {
                    if (0 != 0) {
                        try {
                            harness2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        harness2.close();
                    }
                }
                harness = getHarness(new KeyedProcessOperator(new SimpleProcessFunction()), state);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    harness.processWatermark(EVENT_TIMER.longValue());
                    harness.setProcessingTime(PROC_TIMER.longValue());
                    assertHarnessOutput(harness, Tuple3.of(1L, EVENT_TIMER, TimeDomain.EVENT_TIME), Tuple3.of(2L, EVENT_TIMER, TimeDomain.EVENT_TIME), Tuple3.of(3L, EVENT_TIMER, TimeDomain.EVENT_TIME), Tuple3.of(1L, PROC_TIMER, TimeDomain.PROCESSING_TIME), Tuple3.of(2L, PROC_TIMER, TimeDomain.PROCESSING_TIME), Tuple3.of(3L, PROC_TIMER, TimeDomain.PROCESSING_TIME));
                    if (harness != null) {
                        if (0 == 0) {
                            harness.close();
                            return;
                        }
                        try {
                            harness.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (harness != null) {
                    if (th != null) {
                        try {
                            harness.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        harness.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (harness2 != null) {
                if (th2 != null) {
                    try {
                        harness2.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    harness2.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testNonTimerStatesRestorableByNonProcessesOperator() throws Exception {
        KeyedOneInputStreamOperatorTestHarness harness;
        Throwable th;
        KeyedStateBootstrapOperator<Long, Long> keyedStateBootstrapOperator = new KeyedStateBootstrapOperator<>(0L, new Path(this.folder.newFolder().toURI()), new SimpleBootstrapFunction());
        KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState> harness2 = getHarness(keyedStateBootstrapOperator);
        Throwable th2 = null;
        try {
            try {
                processElements(harness2, 1L, 2L, 3L);
                OperatorSubtaskState state = getState(keyedStateBootstrapOperator, harness2);
                if (harness2 != null) {
                    if (0 != 0) {
                        try {
                            harness2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        harness2.close();
                    }
                }
                harness = getHarness(new StreamMap(new StreamingFunction()), state);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    processElements(harness, 1L, 2L, 3L);
                    assertHarnessOutput(harness, 1L, 2L, 3L);
                    harness.snapshot(0L, 0L);
                    if (harness != null) {
                        if (0 == 0) {
                            harness.close();
                            return;
                        }
                        try {
                            harness.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (harness != null) {
                    if (th != null) {
                        try {
                            harness.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        harness.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (harness2 != null) {
                if (th2 != null) {
                    try {
                        harness2.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    harness2.close();
                }
            }
            throw th9;
        }
    }

    private <T> KeyedOneInputStreamOperatorTestHarness<Long, Long, T> getHarness(OneInputStreamOperator<Long, T> oneInputStreamOperator) throws Exception {
        return getHarness(oneInputStreamOperator, null);
    }

    private <T> KeyedOneInputStreamOperatorTestHarness<Long, Long, T> getHarness(OneInputStreamOperator<Long, T> oneInputStreamOperator, OperatorSubtaskState operatorSubtaskState) throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Long, Long, T> keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness<>(oneInputStreamOperator, l -> {
            return l;
        }, Types.LONG, 128, 1, 0);
        keyedOneInputStreamOperatorTestHarness.setStateBackend(new RocksDBStateBackend(this.folder.newFolder().toURI()));
        if (operatorSubtaskState != null) {
            keyedOneInputStreamOperatorTestHarness.initializeState(operatorSubtaskState);
        }
        keyedOneInputStreamOperatorTestHarness.open();
        return keyedOneInputStreamOperatorTestHarness;
    }

    private <T> void processElements(KeyedOneInputStreamOperatorTestHarness<Long, Long, T> keyedOneInputStreamOperatorTestHarness, Long... lArr) throws Exception {
        for (Long l : lArr) {
            keyedOneInputStreamOperatorTestHarness.processElement(l, 0L);
        }
    }

    private OperatorSubtaskState getState(KeyedStateBootstrapOperator<Long, Long> keyedStateBootstrapOperator, KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState> keyedOneInputStreamOperatorTestHarness) throws Exception {
        keyedStateBootstrapOperator.endInput();
        return ((TaggedOperatorSubtaskState) keyedOneInputStreamOperatorTestHarness.extractOutputValues().get(0)).state;
    }

    private <T> void assertHarnessOutput(KeyedOneInputStreamOperatorTestHarness<Long, Long, T> keyedOneInputStreamOperatorTestHarness, T... tArr) {
        Assert.assertThat("The output from the operator does not match the expected values", keyedOneInputStreamOperatorTestHarness.extractOutputValues(), Matchers.containsInAnyOrder(tArr));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2033158179:
                if (implMethodName.equals("lambda$getHarness$dabacaa0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
