/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state.operator.restore;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamOperatorSnapshotRestoreTest
extends TestLogger {
    private static final int ONLY_JM_RECOVERY = 0;
    private static final int TM_AND_JM_RECOVERY = 1;
    private static final int TM_REMOVE_JM_RECOVERY = 2;
    private static final int JM_REMOVE_TM_RECOVERY = 3;
    private static final int MAX_PARALLELISM = 10;
    protected static TemporaryFolder temporaryFolder;
    @Parameterized.Parameter
    public StateBackendEnum stateBackendEnum;

    @Parameterized.Parameters(name="statebackend type ={0}")
    public static Collection<StateBackendEnum> parameter() {
        return Arrays.asList(StateBackendEnum.values());
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
    }

    @AfterClass
    public static void afterClass() {
        temporaryFolder.delete();
    }

    @Test
    public void testOperatorStatesSnapshotRestore() throws Exception {
        this.testOperatorStatesSnapshotRestoreInternal(0);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalState() throws Exception {
        this.testOperatorStatesSnapshotRestoreInternal(1);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM() throws Exception {
        this.testOperatorStatesSnapshotRestoreInternal(2);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM() throws Exception {
        this.testOperatorStatesSnapshotRestoreInternal(3);
    }

    private void testOperatorStatesSnapshotRestoreInternal(int mode) throws Exception {
        HashMapStateBackend stateBackend;
        HashMapStateBackend hashMapStateBackend = this.createStateBackendInternal();
        switch (this.stateBackendEnum) {
            case FILE: {
                stateBackend = hashMapStateBackend;
                break;
            }
            case ROCKSDB_FULLY_ASYNC: {
                stateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.FALSE);
                break;
            }
            case ROCKSDB_INCREMENTAL: {
                stateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.TRUE);
                break;
            }
            default: {
                throw new IllegalStateException(String.format("Do not support statebackend type %s", new Object[]{this.stateBackendEnum}));
            }
        }
        TestOneInputStreamOperator op = new TestOneInputStreamOperator(false);
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        int subtaskIdx = 0;
        LocalSnapshotDirectoryProviderImpl directoryProvider = mode == 0 ? null : new LocalSnapshotDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx);
        LocalRecoveryConfig localRecoveryConfig = directoryProvider == null ? LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED : LocalRecoveryConfig.backupAndRecoveryEnabled((LocalSnapshotDirectoryProvider)directoryProvider);
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setJobID(jobID).setJobVertexID(jobVertexID).setTaskName("test").setManagedMemorySize(0x100000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(0x100000).setTaskStateManager((TaskStateManager)new TestTaskStateManager(localRecoveryConfig)).setMaxParallelism(10).setSubtaskIndex(subtaskIdx).setUserCodeClassLoader(((Object)((Object)this)).getClass().getClassLoader()).build();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)op, (KeySelector & Serializable)value -> value, TypeInformation.of(Integer.class), mockEnvironment);
        testHarness.setStateBackend((StateBackend)stateBackend);
        testHarness.setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage());
        testHarness.open();
        for (int i = 0; i < 10; ++i) {
            testHarness.processElement(new StreamRecord((Object)i));
        }
        OperatorSnapshotFinalizer snapshotWithLocalState = testHarness.snapshotWithLocalState(1L, 1L);
        testHarness.close();
        op = new TestOneInputStreamOperator(true);
        testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)op, (KeySelector & Serializable)value -> value, TypeInformation.of(Integer.class), 10, 1, 0);
        testHarness.setTimeServiceManagerProvider(new InternalTimeServiceManager.Provider(){

            public <K> InternalTimeServiceManager<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) throws IOException {
                return null;
            }
        });
        testHarness.setStateBackend((StateBackend)stateBackend);
        OperatorSubtaskState jobManagerOwnedState = snapshotWithLocalState.getJobManagerOwnedState();
        OperatorSubtaskState taskLocalState = snapshotWithLocalState.getTaskLocalState();
        Assert.assertTrue((mode > 0 == (taskLocalState != null && taskLocalState.hasState()) ? 1 : 0) != 0);
        if (mode == 2) {
            jobManagerOwnedState.getManagedKeyedState().discardState();
        } else if (mode == 3) {
            taskLocalState.getManagedKeyedState().discardState();
        }
        testHarness.initializeState(jobManagerOwnedState, taskLocalState);
        testHarness.open();
        for (int i = 0; i < 10; ++i) {
            testHarness.processElement(new StreamRecord((Object)i));
        }
        testHarness.close();
    }

    private HashMapStateBackend createStateBackendInternal() throws IOException {
        return new HashMapStateBackend();
    }

    static class TestOneInputStreamOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        private static final long serialVersionUID = -8942866418598856475L;
        private final boolean verifyRestore;
        private ValueState<Integer> keyedState;
        private ListState<Integer> opState;

        TestOneInputStreamOperator(boolean verifyRestore) {
            this.verifyRestore = verifyRestore;
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
            if (this.verifyRestore) {
                long exp = (Integer)element.getValue() + 1;
                long act = ((Integer)this.keyedState.value()).intValue();
                Assert.assertEquals((long)exp, (long)act);
            } else {
                this.keyedState.update((Object)((Integer)element.getValue() + 1));
                this.opState.add((Object)((Integer)element.getValue()));
            }
        }

        public void processWatermark(Watermark mark) {
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            KeyedStateCheckpointOutputStream out = context.getRawKeyedOperatorStateOutput();
            DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
            int count = 0;
            Iterator iterator = out.getKeyGroupList().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                out.startNewKeyGroup(kg);
                dov.writeInt(kg + 2);
                ++count;
            }
            Assert.assertEquals((long)10L, (long)count);
            OperatorStateCheckpointOutputStream outOp = context.getRawOperatorStateOutput();
            dov = new DataOutputViewStreamWrapper((OutputStream)outOp);
            for (int i = 0; i < 13; ++i) {
                outOp.startNewPartition();
                dov.writeInt(42 + i);
            }
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            Assert.assertEquals((Object)this.verifyRestore, (Object)context.isRestored());
            this.keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor("managed-keyed", Integer.class, (Object)0));
            this.opState = context.getOperatorStateStore().getListState(new ListStateDescriptor("managed-op-state", (TypeSerializer)IntSerializer.INSTANCE));
            if (context.isRestored()) {
                int count = 0;
                for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
                    InputStream in = streamProvider.getStream();
                    try {
                        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
                        Assert.assertEquals((long)(streamProvider.getKeyGroupId() + 2), (long)div.readInt());
                        ++count;
                    }
                    finally {
                        if (in == null) continue;
                        in.close();
                    }
                }
                Assert.assertEquals((long)10L, (long)count);
                BitSet check = new BitSet(10);
                Iterator iterator = ((Iterable)this.opState.get()).iterator();
                while (iterator.hasNext()) {
                    int v = (Integer)iterator.next();
                    check.set(v);
                }
                Assert.assertEquals((long)10L, (long)check.cardinality());
                check = new BitSet(13);
                for (StatePartitionStreamProvider streamProvider : context.getRawOperatorStateInputs()) {
                    InputStream in = streamProvider.getStream();
                    try {
                        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
                        check.set(div.readInt() - 42);
                    }
                    finally {
                        if (in == null) continue;
                        in.close();
                    }
                }
                Assert.assertEquals((long)13L, (long)check.cardinality());
            }
        }
    }

    static enum StateBackendEnum {
        FILE,
        ROCKSDB_FULLY_ASYNC,
        ROCKSDB_INCREMENTAL;

    }
}

