package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImplTest;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.class */
public class StreamTaskStateInitializerImplTest {
    @Test
    public void testNoRestore() throws Exception {
        StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager((MemoryStateBackend) Mockito.spy(new MemoryStateBackend(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE)), null, true);
        OperatorID operatorID = new OperatorID(47L, 11L);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(operatorID);
        IntSerializer intSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateManager.streamOperatorStateContext(abstractStreamOperator.getOperatorID(), abstractStreamOperator.getClass().getSimpleName(), abstractStreamOperator, intSerializer, closeableRegistry, new UnregisteredMetricsGroup());
        Closeable operatorStateBackend = streamOperatorStateContext.operatorStateBackend();
        Closeable keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        InternalTimeServiceManager internalTimerServiceManager = streamOperatorStateContext.internalTimerServiceManager();
        Closeable rawKeyedStateInputs = streamOperatorStateContext.rawKeyedStateInputs();
        Closeable rawOperatorStateInputs = streamOperatorStateContext.rawOperatorStateInputs();
        Assert.assertEquals(false, Boolean.valueOf(streamOperatorStateContext.isRestored()));
        Assert.assertNotNull(operatorStateBackend);
        Assert.assertNotNull(keyedStateBackend);
        Assert.assertNotNull(internalTimerServiceManager);
        Assert.assertNotNull(rawKeyedStateInputs);
        Assert.assertNotNull(rawOperatorStateInputs);
        checkCloseablesRegistered(closeableRegistry, operatorStateBackend, keyedStateBackend, rawKeyedStateInputs, rawOperatorStateInputs);
        Assert.assertFalse(rawKeyedStateInputs.iterator().hasNext());
        Assert.assertFalse(rawOperatorStateInputs.iterator().hasNext());
    }

    @Test
    public void testWithRestore() throws Exception {
        StateBackend stateBackend = (StateBackend) Mockito.spy(new StateBackend() { // from class: org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImplTest.1
            public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
                throw new UnsupportedOperationException();
            }

            public CheckpointStorage createCheckpointStorage(JobID jobID) throws IOException {
                throw new UnsupportedOperationException();
            }

            public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
                return (AbstractKeyedStateBackend) Mockito.mock(AbstractKeyedStateBackend.class);
            }

            public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
                return (OperatorStateBackend) Mockito.mock(OperatorStateBackend.class);
            }
        });
        OperatorID operatorID = new OperatorID(47L, 11L);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        Random random = new Random(66L);
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, new OperatorSubtaskState(new OperatorStreamStateHandle(Collections.singletonMap("a", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random)), new OperatorStreamStateHandle(Collections.singletonMap("_default_", new OperatorStateHandle.StateMetaInfo(new long[]{0, 20, 30}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random)), CheckpointTestUtils.createDummyKeyGroupStateHandle(random), CheckpointTestUtils.createDummyKeyGroupStateHandle(random)));
        StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(stateBackend, new JobManagerTaskRestore(0L, taskStateSnapshot), false);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(operatorID);
        IntSerializer intSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateManager.streamOperatorStateContext(abstractStreamOperator.getOperatorID(), abstractStreamOperator.getClass().getSimpleName(), abstractStreamOperator, intSerializer, closeableRegistry, new UnregisteredMetricsGroup());
        Closeable operatorStateBackend = streamOperatorStateContext.operatorStateBackend();
        Closeable keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        InternalTimeServiceManager internalTimerServiceManager = streamOperatorStateContext.internalTimerServiceManager();
        Closeable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = streamOperatorStateContext.rawKeyedStateInputs();
        Closeable<StatePartitionStreamProvider> rawOperatorStateInputs = streamOperatorStateContext.rawOperatorStateInputs();
        Assert.assertEquals(true, Boolean.valueOf(streamOperatorStateContext.isRestored()));
        Assert.assertNotNull(operatorStateBackend);
        Assert.assertNotNull(keyedStateBackend);
        Assert.assertNull(internalTimerServiceManager);
        Assert.assertNotNull(rawKeyedStateInputs);
        Assert.assertNotNull(rawOperatorStateInputs);
        int i = 0;
        for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : rawKeyedStateInputs) {
            i++;
        }
        Assert.assertEquals(1L, i);
        int i2 = 0;
        for (StatePartitionStreamProvider statePartitionStreamProvider : rawOperatorStateInputs) {
            i2++;
        }
        Assert.assertEquals(3L, i2);
        checkCloseablesRegistered(closeableRegistry, operatorStateBackend, keyedStateBackend, rawKeyedStateInputs, rawOperatorStateInputs);
    }

    private static void checkCloseablesRegistered(CloseableRegistry closeableRegistry, Closeable... closeableArr) {
        for (Closeable closeable : closeableArr) {
            Assert.assertTrue(closeableRegistry.unregisterCloseable(closeable));
        }
    }

    private StreamTaskStateInitializer streamTaskStateManager(StateBackend stateBackend, JobManagerTaskRestore jobManagerTaskRestore, boolean z) {
        TaskStateManager taskStateManager = TaskStateManagerImplTest.taskStateManager(new JobID(42L, 43L), new ExecutionAttemptID(23L, 24L), new TestCheckpointResponder(), jobManagerTaskRestore, new TestTaskLocalStateStore());
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test-task", 1, 0);
        dummyEnvironment.setTaskStateManager(taskStateManager);
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        return z ? new StreamTaskStateInitializerImpl(dummyEnvironment, stateBackend, testProcessingTimeService) : new StreamTaskStateInitializerImpl(dummyEnvironment, stateBackend, testProcessingTimeService) { // from class: org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImplTest.2
            protected <K> InternalTimeServiceManager<K> internalTimeServiceManager(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, KeyContext keyContext, Iterable<KeyGroupStatePartitionStreamProvider> iterable) throws Exception {
                return null;
            }
        };
    }
}
