package org.apache.flink.streaming.api.operators;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingFSDataInputStream;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.class */
public class BackendRestorerProcedureTest extends TestLogger {
    private final FunctionWithException<Collection<OperatorStateHandle>, OperatorStateBackend, Exception> backendSupplier = collection -> {
        return new DefaultOperatorStateBackendBuilder(getClass().getClassLoader(), new ExecutionConfig(), true, collection, new CloseableRegistry()).build();
    };

    @Test
    public void testRestoreProcedureOrderAndFailure() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        MemCheckpointStreamFactory memCheckpointStreamFactory = new MemCheckpointStreamFactory(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test-state", Integer.class);
        OperatorStateBackend operatorStateBackend = (OperatorStateBackend) this.backendSupplier.apply(Collections.emptyList());
        try {
            ListState listState = operatorStateBackend.getListState(listStateDescriptor);
            listState.add(0);
            listState.add(1);
            listState.add(2);
            listState.add(3);
            RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.run();
            SnapshotResult snapshotResult = (SnapshotResult) snapshot.get();
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            OperatorStateHandle operatorStateHandle = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
            OperatorStateHandle operatorStateHandle2 = (OperatorStateHandle) PowerMockito.spy(snapshotResult.getJobManagerOwnedSnapshot());
            OperatorStateHandle operatorStateHandle3 = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
            operatorStateBackend = new BackendRestorerProcedure(this.backendSupplier, closeableRegistry, "test op state backend").createAndRestore(Arrays.asList(new StateObjectCollection(Collections.singletonList(operatorStateHandle)), new StateObjectCollection(Collections.singletonList(operatorStateHandle2)), new StateObjectCollection(Collections.singletonList(operatorStateHandle3))));
            Assert.assertNotNull(operatorStateBackend);
            try {
                ((OperatorStateHandle) Mockito.verify(operatorStateHandle)).openInputStream();
                ((OperatorStateHandle) Mockito.verify(operatorStateHandle2)).openInputStream();
                PowerMockito.verifyZeroInteractions(new Object[]{operatorStateHandle3});
                Iterator it = ((Iterable) operatorStateBackend.getListState(listStateDescriptor).get()).iterator();
                Assert.assertEquals(0L, ((Integer) it.next()).intValue());
                Assert.assertEquals(1L, ((Integer) it.next()).intValue());
                Assert.assertEquals(2L, ((Integer) it.next()).intValue());
                Assert.assertEquals(3L, ((Integer) it.next()).intValue());
                Assert.assertFalse(it.hasNext());
                operatorStateBackend.close();
                operatorStateBackend.dispose();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testExceptionThrownIfAllRestoresFailed() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
        OperatorStateHandle operatorStateHandle2 = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
        OperatorStateHandle operatorStateHandle3 = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
        try {
            new BackendRestorerProcedure(this.backendSupplier, closeableRegistry, "test op state backend").createAndRestore(Arrays.asList(new StateObjectCollection(Collections.singletonList(operatorStateHandle)), new StateObjectCollection(Collections.singletonList(operatorStateHandle2)), new StateObjectCollection(Collections.singletonList(operatorStateHandle3))));
            Assert.fail();
        } catch (Exception e) {
        }
        ((OperatorStateHandle) Mockito.verify(operatorStateHandle)).openInputStream();
        ((OperatorStateHandle) Mockito.verify(operatorStateHandle2)).openInputStream();
        ((OperatorStateHandle) Mockito.verify(operatorStateHandle3)).openInputStream();
    }

    @Test
    public void testCanBeCanceledViaRegistry() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) PowerMockito.mock(OperatorStateHandle.class);
        PowerMockito.when(operatorStateHandle.openInputStream()).thenReturn(new BlockingFSDataInputStream(oneShotLatch, oneShotLatch2));
        List singletonList = Collections.singletonList(new StateObjectCollection(Collections.singletonList(operatorStateHandle)));
        BackendRestorerProcedure backendRestorerProcedure = new BackendRestorerProcedure(this.backendSupplier, closeableRegistry, "test op state backend");
        AtomicReference atomicReference = new AtomicReference(null);
        Thread thread = new Thread(() -> {
            try {
                backendRestorerProcedure.createAndRestore(singletonList);
            } catch (Exception e) {
                atomicReference.set(e);
            }
        });
        thread.start();
        oneShotLatch.await();
        closeableRegistry.close();
        oneShotLatch2.trigger();
        thread.join();
        Assert.assertTrue(((Exception) atomicReference.get()) instanceof FlinkException);
    }
}
