package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import junit.framework.TestCase;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({AbstractStreamOperator.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.class */
public class AbstractStreamOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$TestKeySelector.class */
    private static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1;

        private TestKeySelector() {
        }

        public Integer getKey(Tuple2<Integer, String> tuple2) throws Exception {
            return (Integer) tuple2.f0;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$TestOperator.class */
    private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        private transient InternalTimerService<VoidNamespace> timerService;
        private final ValueStateDescriptor<String> stateDescriptor;

        private TestOperator() {
            this.stateDescriptor = new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            String[] split = ((String) ((Tuple2) streamRecord.getValue()).f1).split(":");
            String str = split[0];
            boolean z = -1;
            switch (str.hashCode()) {
                case -1478918947:
                    if (str.equals("DELETE_STATE")) {
                        z = true;
                        break;
                    }
                    break;
                case -1172772417:
                    if (str.equals("SET_PROC_TIME_TIMER")) {
                        z = 3;
                        break;
                    }
                    break;
                case -562919307:
                    if (str.equals("SET_EVENT_TIME_TIMER")) {
                        z = 2;
                        break;
                    }
                    break;
                case 301419781:
                    if (str.equals("EMIT_STATE")) {
                        z = 4;
                        break;
                    }
                    break;
                case 339850804:
                    if (str.equals("SET_STATE")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    getPartitionedState(this.stateDescriptor).update(split[1]);
                    return;
                case true:
                    getPartitionedState(this.stateDescriptor).clear();
                    return;
                case true:
                    this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(split[1]));
                    return;
                case true:
                    this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(split[1]));
                    return;
                case true:
                    this.output.collect(new StreamRecord("ON_ELEMENT:" + ((Tuple2) streamRecord.getValue()).f0 + ":" + ((String) getPartitionedState(this.stateDescriptor).value())));
                    return;
                default:
                    throw new IllegalArgumentException();
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            this.output.collect(new StreamRecord("ON_EVENT_TIME:" + ((String) getPartitionedState(this.stateDescriptor).value())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            this.output.collect(new StreamRecord("ON_PROC_TIME:" + ((String) getPartitionedState(this.stateDescriptor).value())));
        }
    }

    @Test
    public void testStateDoesNotInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_STATE:CIAO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "EMIT_STATE"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "EMIT_STATE"), 0L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"}));
    }

    @Test
    public void testEventTimeTimersDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_EVENT_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_STATE:CIAO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_EVENT_TIME_TIMER:10"), 0L);
        keyedOneInputStreamOperatorTestHarness.processWatermark(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
        keyedOneInputStreamOperatorTestHarness.processWatermark(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
    }

    @Test
    public void testProcessingTimeTimersDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_PROC_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_STATE:CIAO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_PROC_TIME_TIMER:10"), 0L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
    }

    @Test
    public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_PROC_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(1, "SET_STATE:CIAO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_PROC_TIME_TIMER:10"), 0L);
        OperatorStateHandles snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness2.setup();
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
    }

    @Test
    public void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_PROC_TIME_TIMER:10"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_EVENT_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(0, "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processWatermark(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
    }

    @Test
    public void testStateAndTimerStateShufflingScalingUp() throws Exception {
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 4);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(keyGroupRange.getEndKeyGroup() + 1, 9);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, 10);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, 10);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 1, 0);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_EVENT_TIME_TIMER:10"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_EVENT_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_PROC_TIME_TIMER:10"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_PROC_TIME_TIMER:20"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_STATE:CIAO"), 0L);
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness).isEmpty());
        OperatorStateHandles snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 2, 0);
        keyedOneInputStreamOperatorTestHarness2.setup();
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.processWatermark(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness2), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness2).isEmpty());
        keyedOneInputStreamOperatorTestHarness2.processWatermark(20L);
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness2).isEmpty());
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(10L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness2).isEmpty());
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(20L);
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness2).isEmpty());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness3 = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 2, 1);
        keyedOneInputStreamOperatorTestHarness3.setup();
        keyedOneInputStreamOperatorTestHarness3.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness3.open();
        keyedOneInputStreamOperatorTestHarness3.processWatermark(10L);
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
        keyedOneInputStreamOperatorTestHarness3.processWatermark(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
        keyedOneInputStreamOperatorTestHarness3.setProcessingTime(10L);
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
        keyedOneInputStreamOperatorTestHarness3.setProcessingTime(20L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
    }

    @Test
    public void testStateAndTimerStateShufflingScalingDown() throws Exception {
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 4);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(keyGroupRange.getEndKeyGroup() + 1, 9);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, 10);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, 10);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 2, 0);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 2, 1);
        keyedOneInputStreamOperatorTestHarness2.setup();
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_EVENT_TIME_TIMER:30"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_PROC_TIME_TIMER:30"), 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange), "SET_STATE:HELLO"), 0L);
        keyedOneInputStreamOperatorTestHarness2.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_EVENT_TIME_TIMER:40"), 0L);
        keyedOneInputStreamOperatorTestHarness2.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_PROC_TIME_TIMER:40"), 0L);
        keyedOneInputStreamOperatorTestHarness2.processElement(new Tuple2(Integer.valueOf(keyInKeyGroupRange2), "SET_STATE:CIAO"), 0L);
        OperatorStateHandles repackageState = AbstractStreamOperatorTestHarness.repackageState(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), keyedOneInputStreamOperatorTestHarness2.snapshot(0L, 0L));
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness3 = new KeyedOneInputStreamOperatorTestHarness(new TestOperator(), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 10, 1, 0);
        keyedOneInputStreamOperatorTestHarness3.setup();
        keyedOneInputStreamOperatorTestHarness3.initializeState(repackageState);
        keyedOneInputStreamOperatorTestHarness3.open();
        keyedOneInputStreamOperatorTestHarness3.processWatermark(30L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
        keyedOneInputStreamOperatorTestHarness3.processWatermark(40L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
        keyedOneInputStreamOperatorTestHarness3.setProcessingTime(30L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
        keyedOneInputStreamOperatorTestHarness3.setProcessingTime(40L);
        MatcherAssert.assertThat(extractResult(keyedOneInputStreamOperatorTestHarness3), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
        TestCase.assertTrue(extractResult(keyedOneInputStreamOperatorTestHarness3).isEmpty());
    }

    @Test
    public void testSnapshotMethod() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StateSnapshotContextSynchronousImpl stateSnapshotContextSynchronousImpl = (StateSnapshotContextSynchronousImpl) PowerMockito.mock(StateSnapshotContextSynchronousImpl.class);
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(stateSnapshotContextSynchronousImpl);
        StreamTask streamTask = (StreamTask) PowerMockito.mock(StreamTask.class);
        PowerMockito.when(streamTask.getCancelables()).thenReturn(closeableRegistry);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when(abstractStreamOperator.snapshotState(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong(), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class))).thenCallRealMethod();
        ((AbstractStreamOperator) PowerMockito.doReturn(streamTask).when(abstractStreamOperator)).getContainingTask();
        abstractStreamOperator.snapshotState(42L, 1L, CheckpointOptions.forFullCheckpoint());
        ((StateSnapshotContextSynchronousImpl) Mockito.verify(stateSnapshotContextSynchronousImpl)).close();
    }

    @Test
    public void testFailingSnapshotMethod() throws Exception {
        Exception exc = new Exception("Test exception");
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StateSnapshotContextSynchronousImpl stateSnapshotContextSynchronousImpl = (StateSnapshotContextSynchronousImpl) PowerMockito.mock(StateSnapshotContextSynchronousImpl.class);
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(stateSnapshotContextSynchronousImpl);
        StreamTask streamTask = (StreamTask) PowerMockito.mock(StreamTask.class);
        PowerMockito.when(streamTask.getCancelables()).thenReturn(closeableRegistry);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when(abstractStreamOperator.snapshotState(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong(), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class))).thenCallRealMethod();
        ((AbstractStreamOperator) PowerMockito.doReturn(streamTask).when(abstractStreamOperator)).getContainingTask();
        ((AbstractStreamOperator) Mockito.doThrow(exc).when(abstractStreamOperator)).snapshotState((StateSnapshotContext) org.mockito.Matchers.eq(stateSnapshotContextSynchronousImpl));
        try {
            abstractStreamOperator.snapshotState(42L, 1L, CheckpointOptions.forFullCheckpoint());
            Assert.fail("Exception expected.");
        } catch (Exception e) {
            Assert.assertEquals(exc, e.getCause());
        }
        ((StateSnapshotContextSynchronousImpl) Mockito.verify(stateSnapshotContextSynchronousImpl)).close();
    }

    @Test
    public void testFailingBackendSnapshotMethod() throws Exception {
        Exception exc = new Exception("Test exception");
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        RunnableFuture runnableFuture = (RunnableFuture) PowerMockito.mock(RunnableFuture.class);
        RunnableFuture runnableFuture2 = (RunnableFuture) PowerMockito.mock(RunnableFuture.class);
        StateSnapshotContextSynchronousImpl stateSnapshotContextSynchronousImpl = (StateSnapshotContextSynchronousImpl) PowerMockito.mock(StateSnapshotContextSynchronousImpl.class);
        PowerMockito.when(stateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture()).thenReturn(runnableFuture);
        PowerMockito.when(stateSnapshotContextSynchronousImpl.getOperatorStateStreamFuture()).thenReturn(runnableFuture2);
        OperatorSnapshotResult operatorSnapshotResult = (OperatorSnapshotResult) PowerMockito.spy(new OperatorSnapshotResult());
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(stateSnapshotContextSynchronousImpl);
        PowerMockito.whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult);
        CheckpointStreamFactory checkpointStreamFactory = (CheckpointStreamFactory) PowerMockito.mock(CheckpointStreamFactory.class);
        StreamTask streamTask = (StreamTask) PowerMockito.mock(StreamTask.class);
        PowerMockito.when(streamTask.getCancelables()).thenReturn(closeableRegistry);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when(abstractStreamOperator.snapshotState(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong(), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class))).thenCallRealMethod();
        PowerMockito.when(abstractStreamOperator.getCheckpointStreamFactory((CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class))).thenReturn(checkpointStreamFactory);
        ((AbstractStreamOperator) PowerMockito.doReturn(streamTask).when(abstractStreamOperator)).getContainingTask();
        RunnableFuture runnableFuture3 = (RunnableFuture) PowerMockito.mock(RunnableFuture.class);
        OperatorStateBackend operatorStateBackend = (OperatorStateBackend) PowerMockito.mock(OperatorStateBackend.class);
        PowerMockito.when(operatorStateBackend.snapshot(org.mockito.Matchers.eq(42L), org.mockito.Matchers.eq(1L), (CheckpointStreamFactory) org.mockito.Matchers.eq(checkpointStreamFactory), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class))).thenReturn(runnableFuture3);
        AbstractKeyedStateBackend abstractKeyedStateBackend = (AbstractKeyedStateBackend) PowerMockito.mock(AbstractKeyedStateBackend.class);
        PowerMockito.when(abstractKeyedStateBackend.snapshot(org.mockito.Matchers.eq(42L), org.mockito.Matchers.eq(1L), (CheckpointStreamFactory) org.mockito.Matchers.eq(checkpointStreamFactory), (CheckpointOptions) org.mockito.Matchers.eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(new Throwable[]{exc});
        Whitebox.setInternalState(abstractStreamOperator, "operatorStateBackend", operatorStateBackend);
        Whitebox.setInternalState(abstractStreamOperator, "keyedStateBackend", abstractKeyedStateBackend);
        Whitebox.setInternalState(abstractStreamOperator, "checkpointStreamFactory", checkpointStreamFactory);
        try {
            abstractStreamOperator.snapshotState(42L, 1L, CheckpointOptions.forFullCheckpoint());
            Assert.fail("Exception expected.");
        } catch (Exception e) {
            Assert.assertEquals(exc, e.getCause());
        }
        ((StateSnapshotContextSynchronousImpl) Mockito.verify(stateSnapshotContextSynchronousImpl)).close();
        ((OperatorSnapshotResult) Mockito.verify(operatorSnapshotResult)).cancel();
        ((RunnableFuture) Mockito.verify(runnableFuture)).cancel(org.mockito.Matchers.anyBoolean());
        ((RunnableFuture) Mockito.verify(runnableFuture2)).cancel(org.mockito.Matchers.anyBoolean());
        ((RunnableFuture) Mockito.verify(runnableFuture)).cancel(org.mockito.Matchers.anyBoolean());
    }

    private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> oneInputStreamOperatorTestHarness) {
        List<StreamRecord<? extends T>> extractOutputStreamRecords = oneInputStreamOperatorTestHarness.extractOutputStreamRecords();
        ArrayList arrayList = new ArrayList();
        for (StreamRecord<? extends T> streamRecord : extractOutputStreamRecords) {
            if (streamRecord instanceof StreamRecord) {
                arrayList.add(streamRecord.getValue());
            }
        }
        oneInputStreamOperatorTestHarness.getOutput().clear();
        return arrayList;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange keyGroupRange, int i) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i2 = nextInt;
            if (keyGroupRange.contains(KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i2), i))) {
                return i2;
            }
            nextInt = random.nextInt();
        }
    }
}
