/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.OptionalLong;
import java.util.Random;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImplTest;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.CloseableIterable;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StreamTaskStateInitializerImplTest {
    @Test
    public void testNoRestore() throws Exception {
        MemoryStateBackend stateBackend = (MemoryStateBackend)Mockito.spy((Object)new MemoryStateBackend(1024));
        StreamTaskStateInitializer streamTaskStateManager = this.streamTaskStateManager((StateBackend)stateBackend, null, true);
        OperatorID operatorID = new OperatorID(47L, 11L);
        AbstractStreamOperator streamOperator = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        IntSerializer typeSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(streamOperator.getOperatorID(), streamOperator.getClass().getSimpleName(), (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)streamOperator, (TypeSerializer)typeSerializer, closeableRegistry, (MetricGroup)new UnregisteredMetricsGroup(), 1.0, false);
        OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend();
        InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager();
        CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
        CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
        Assert.assertFalse((String)"Expected the context to NOT be restored", (boolean)stateContext.isRestored());
        Assert.assertNotNull((Object)operatorStateBackend);
        Assert.assertNotNull((Object)keyedStateBackend);
        Assert.assertNotNull((Object)timeServiceManager);
        Assert.assertNotNull((Object)keyedStateInputs);
        Assert.assertNotNull((Object)operatorStateInputs);
        StreamTaskStateInitializerImplTest.checkCloseablesRegistered(closeableRegistry, new Closeable[]{operatorStateBackend, keyedStateBackend, keyedStateInputs, operatorStateInputs});
        Assert.assertFalse((boolean)keyedStateInputs.iterator().hasNext());
        Assert.assertFalse((boolean)operatorStateInputs.iterator().hasNext());
    }

    @Test
    public void testWithRestore() throws Exception {
        StateBackend mockingBackend = (StateBackend)Mockito.spy((Object)new StateBackend(){

            public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
                return (AbstractKeyedStateBackend)Mockito.mock(AbstractKeyedStateBackend.class);
            }

            public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
                return (OperatorStateBackend)Mockito.mock(OperatorStateBackend.class);
            }
        });
        OperatorID operatorID = new OperatorID(47L, 11L);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        Random random = new Random(66L);
        OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.singletonMap("a", new OperatorStateHandle.StateMetaInfo(new long[]{0L, 10L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle((Random)random, null))).setRawOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.singletonMap("_default_", new OperatorStateHandle.StateMetaInfo(new long[]{0L, 20L, 30L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle((Random)random, null))).setManagedKeyedState((KeyedStateHandle)CheckpointTestUtils.createDummyKeyGroupStateHandle((Random)random, null)).setRawKeyedState((KeyedStateHandle)CheckpointTestUtils.createDummyKeyGroupStateHandle((Random)random, null)).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle((int)10, (Random)random))).setResultSubpartitionState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewResultSubpartitionStateHandle((int)10, (Random)random))).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
        JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(42L, taskStateSnapshot);
        StreamTaskStateInitializer streamTaskStateManager = this.streamTaskStateManager(mockingBackend, jobManagerTaskRestore, false);
        AbstractStreamOperator streamOperator = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        IntSerializer typeSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(streamOperator.getOperatorID(), streamOperator.getClass().getSimpleName(), (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)streamOperator, (TypeSerializer)typeSerializer, closeableRegistry, (MetricGroup)new UnregisteredMetricsGroup(), 1.0, false);
        OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend();
        InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager();
        CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
        CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
        Assert.assertTrue((String)"Expected the context to be restored", (boolean)stateContext.isRestored());
        Assert.assertEquals((Object)OptionalLong.of(42L), (Object)stateContext.getRestoredCheckpointId());
        Assert.assertNotNull((Object)operatorStateBackend);
        Assert.assertNotNull((Object)keyedStateBackend);
        Assert.assertNull((Object)timeServiceManager);
        Assert.assertNotNull((Object)keyedStateInputs);
        Assert.assertNotNull((Object)operatorStateInputs);
        int count = 0;
        for (KeyGroupStatePartitionStreamProvider keyedStateInput : keyedStateInputs) {
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
        count = 0;
        for (StatePartitionStreamProvider operatorStateInput : operatorStateInputs) {
            ++count;
        }
        Assert.assertEquals((long)3L, (long)count);
        StreamTaskStateInitializerImplTest.checkCloseablesRegistered(closeableRegistry, new Closeable[]{operatorStateBackend, keyedStateBackend, keyedStateInputs, operatorStateInputs});
    }

    private static void checkCloseablesRegistered(CloseableRegistry closeableRegistry, Closeable ... closeables) {
        for (Closeable closeable : closeables) {
            Assert.assertTrue((boolean)closeableRegistry.unregisterCloseable((AutoCloseable)closeable));
        }
    }

    private StreamTaskStateInitializer streamTaskStateManager(StateBackend stateBackend, JobManagerTaskRestore jobManagerTaskRestore, boolean createTimerServiceManager) {
        JobID jobID = new JobID(42L, 43L);
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder();
        TestTaskLocalStateStore taskLocalStateStore = new TestTaskLocalStateStore();
        InMemoryStateChangelogStorage changelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManager taskStateManager = TaskStateManagerImplTest.taskStateManager((JobID)jobID, (ExecutionAttemptID)executionAttemptID, (CheckpointResponder)checkpointResponderMock, (JobManagerTaskRestore)jobManagerTaskRestore, (TaskLocalStateStore)taskLocalStateStore, (StateChangelogStorage)changelogStorage);
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test-task", 1, 0);
        dummyEnvironment.setTaskStateManager(taskStateManager);
        if (createTimerServiceManager) {
            return new StreamTaskStateInitializerImpl((Environment)dummyEnvironment, stateBackend);
        }
        return new StreamTaskStateInitializerImpl((Environment)dummyEnvironment, stateBackend, TtlTimeProvider.DEFAULT, new InternalTimeServiceManager.Provider(){

            public <K> InternalTimeServiceManager<K> create(CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) throws Exception {
                return null;
            }
        });
    }
}

