/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import junit.framework.TestCase;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={AbstractStreamOperator.class})
@PowerMockIgnore(value={"java.*", "javax.*", "org.slf4j.*", "org.apache.log4j.*"})
public class AbstractStreamOperatorTest {
    @Test
    public void testStateDoesNotInterfere() throws Exception {
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"EMIT_STATE"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"EMIT_STATE"), 0L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"}));
    }

    @Test
    public void testEventTimeTimersDontInterfere() throws Exception {
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_EVENT_TIME_TIMER:10"), 0L);
        testHarness.processWatermark(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"}));
        testHarness.processWatermark(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"}));
    }

    @Test
    public void testProcessingTimeTimersDontInterfere() throws Exception {
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
        testHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:HELLO"}));
        testHarness.setProcessingTime(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:CIAO"}));
    }

    @Test
    public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestOperator testOperator1 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator1, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness1.setProcessingTime(0L);
        testHarness1.setup();
        testHarness1.initializeState(snapshot);
        testHarness1.open();
        testHarness1.setProcessingTime(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness1), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:HELLO"}));
        testHarness1.setProcessingTime(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness1), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:CIAO"}));
    }

    @Test
    public void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processWatermark(0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processWatermark(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"}));
        testHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:HELLO"}));
    }

    @Test
    public void testStateAndTimerStateShufflingScalingUp() throws Exception {
        int maxParallelism = 10;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 4);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, 9);
        int key1 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange1, 10);
        int key2 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange2, 10);
        TestOperator testOperator = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 1, 0);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.setProcessingTime(0L);
        testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_EVENT_TIME_TIMER:10"), 0L);
        testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
        testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
        testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_STATE:HELLO"), 0L);
        testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_STATE:CIAO"), 0L);
        TestCase.assertTrue((boolean)this.extractResult(testHarness).isEmpty());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestOperator testOperator1 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator1, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 2, 0);
        testHarness1.setup();
        testHarness1.initializeState(snapshot);
        testHarness1.open();
        testHarness1.processWatermark(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness1), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness1).isEmpty());
        testHarness1.processWatermark(20L);
        TestCase.assertTrue((boolean)this.extractResult(testHarness1).isEmpty());
        testHarness1.setProcessingTime(10L);
        MatcherAssert.assertThat(this.extractResult(testHarness1), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:HELLO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness1).isEmpty());
        testHarness1.setProcessingTime(20L);
        TestCase.assertTrue((boolean)this.extractResult(testHarness1).isEmpty());
        TestOperator testOperator2 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator2, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 2, 1);
        testHarness2.setup();
        testHarness2.initializeState(snapshot);
        testHarness2.open();
        testHarness2.processWatermark(10L);
        TestCase.assertTrue((boolean)this.extractResult(testHarness2).isEmpty());
        testHarness2.processWatermark(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness2), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"}));
        testHarness2.setProcessingTime(10L);
        TestCase.assertTrue((boolean)this.extractResult(testHarness2).isEmpty());
        testHarness2.setProcessingTime(20L);
        MatcherAssert.assertThat(this.extractResult(testHarness2), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:CIAO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness2).isEmpty());
    }

    @Test
    public void testStateAndTimerStateShufflingScalingDown() throws Exception {
        int maxParallelism = 10;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 4);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, 9);
        int key1 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange1, 10);
        int key2 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange2, 10);
        TestOperator testOperator1 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator1, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 2, 0);
        testHarness1.setup();
        testHarness1.open();
        testHarness1.processWatermark(0L);
        testHarness1.setProcessingTime(0L);
        TestOperator testOperator2 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator2, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 2, 1);
        testHarness2.setup();
        testHarness2.open();
        testHarness2.processWatermark(0L);
        testHarness2.setProcessingTime(0L);
        testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_EVENT_TIME_TIMER:30"), 0L);
        testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_PROC_TIME_TIMER:30"), 0L);
        testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_STATE:HELLO"), 0L);
        testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_EVENT_TIME_TIMER:40"), 0L);
        testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_PROC_TIME_TIMER:40"), 0L);
        testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_STATE:CIAO"), 0L);
        OperatorSubtaskState repackagedState = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        TestOperator testOperator3 = new TestOperator();
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness3 = new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator3, new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, 10, 1, 0);
        testHarness3.setup();
        testHarness3.initializeState(repackagedState);
        testHarness3.open();
        testHarness3.processWatermark(30L);
        MatcherAssert.assertThat(this.extractResult(testHarness3), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness3).isEmpty());
        testHarness3.processWatermark(40L);
        MatcherAssert.assertThat(this.extractResult(testHarness3), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness3).isEmpty());
        testHarness3.setProcessingTime(30L);
        MatcherAssert.assertThat(this.extractResult(testHarness3), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:HELLO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness3).isEmpty());
        testHarness3.setProcessingTime(40L);
        MatcherAssert.assertThat(this.extractResult(testHarness3), (Matcher)org.hamcrest.Matchers.contains((Object[])new String[]{"ON_PROC_TIME:CIAO"}));
        TestCase.assertTrue((boolean)this.extractResult(testHarness3).isEmpty());
    }

    @Test
    public void testSnapshotMethod() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StateSnapshotContextSynchronousImpl context = (StateSnapshotContextSynchronousImpl)PowerMockito.spy((Object)new StateSnapshotContextSynchronousImpl(0L, 0L));
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn((Object)context);
        StreamTask containingTask = (StreamTask)PowerMockito.mock(StreamTask.class);
        PowerMockito.when((Object)containingTask.getCancelables()).thenReturn((Object)closeableRegistry);
        AbstractStreamOperator operator = (AbstractStreamOperator)PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenCallRealMethod();
        ((AbstractStreamOperator)PowerMockito.doReturn((Object)containingTask).when((Object)operator)).getContainingTask();
        operator.snapshotState(42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation(), (CheckpointStreamFactory)new MemCheckpointStreamFactory(Integer.MAX_VALUE));
        ((StateSnapshotContextSynchronousImpl)Mockito.verify((Object)context)).close();
    }

    @Test
    public void testFailingSnapshotMethod() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        Exception failingException = new Exception("Test exception");
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StateSnapshotContextSynchronousImpl context = (StateSnapshotContextSynchronousImpl)PowerMockito.mock(StateSnapshotContextSynchronousImpl.class);
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn((Object)context);
        StreamTask containingTask = (StreamTask)PowerMockito.mock(StreamTask.class);
        PowerMockito.when((Object)containingTask.getCancelables()).thenReturn((Object)closeableRegistry);
        AbstractStreamOperator operator = (AbstractStreamOperator)PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenCallRealMethod();
        ((AbstractStreamOperator)PowerMockito.doReturn((Object)containingTask).when((Object)operator)).getContainingTask();
        ((AbstractStreamOperator)Mockito.doThrow((Throwable[])new Throwable[]{failingException}).when((Object)operator)).snapshotState((StateSnapshotContext)Matchers.eq((Object)context));
        try {
            operator.snapshotState(42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation(), (CheckpointStreamFactory)new MemCheckpointStreamFactory(Integer.MAX_VALUE));
            Assert.fail((String)"Exception expected.");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)failingException, (Object)e.getCause());
        }
        ((StateSnapshotContextSynchronousImpl)Mockito.verify((Object)context)).close();
    }

    @Test
    public void testFailingBackendSnapshotMethod() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        Exception failingException = new Exception("Test exception");
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        RunnableFuture futureKeyedStateHandle = (RunnableFuture)PowerMockito.mock(RunnableFuture.class);
        RunnableFuture futureOperatorStateHandle = (RunnableFuture)PowerMockito.mock(RunnableFuture.class);
        StateSnapshotContextSynchronousImpl context = (StateSnapshotContextSynchronousImpl)PowerMockito.spy((Object)new StateSnapshotContextSynchronousImpl(42L, 1L));
        PowerMockito.when((Object)context.getKeyedStateStreamFuture()).thenReturn((Object)futureKeyedStateHandle);
        PowerMockito.when((Object)context.getOperatorStateStreamFuture()).thenReturn((Object)futureOperatorStateHandle);
        OperatorSnapshotFutures operatorSnapshotResult = (OperatorSnapshotFutures)PowerMockito.spy((Object)new OperatorSnapshotFutures());
        PowerMockito.whenNew(StateSnapshotContextSynchronousImpl.class).withArguments((Object)Matchers.anyLong(), new Object[]{Matchers.anyLong(), Matchers.any(CheckpointStreamFactory.class), ArgumentMatchers.nullable(KeyGroupRange.class), Matchers.any(CloseableRegistry.class)}).thenReturn((Object)context);
        PowerMockito.whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn((Object)operatorSnapshotResult);
        StreamTask containingTask = (StreamTask)PowerMockito.mock(StreamTask.class);
        PowerMockito.when((Object)containingTask.getCancelables()).thenReturn((Object)closeableRegistry);
        AbstractStreamOperator operator = (AbstractStreamOperator)PowerMockito.mock(AbstractStreamOperator.class);
        PowerMockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenCallRealMethod();
        ((AbstractStreamOperator)Mockito.doCallRealMethod().when((Object)operator)).close();
        ((AbstractStreamOperator)Mockito.doCallRealMethod().when((Object)operator)).dispose();
        ((AbstractStreamOperator)PowerMockito.doReturn((Object)containingTask).when((Object)operator)).getContainingTask();
        RunnableFuture futureManagedOperatorStateHandle = (RunnableFuture)PowerMockito.mock(RunnableFuture.class);
        OperatorStateBackend operatorStateBackend = (OperatorStateBackend)PowerMockito.mock(OperatorStateBackend.class);
        PowerMockito.when((Object)operatorStateBackend.snapshot(Matchers.eq((long)42L), Matchers.eq((long)1L), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)futureManagedOperatorStateHandle);
        AbstractKeyedStateBackend keyedStateBackend = (AbstractKeyedStateBackend)PowerMockito.mock(AbstractKeyedStateBackend.class);
        PowerMockito.when((Object)keyedStateBackend.snapshot(Matchers.eq((long)42L), Matchers.eq((long)1L), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class), (CheckpointOptions)Matchers.eq((Object)CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(new Throwable[]{failingException});
        closeableRegistry.registerCloseable((Closeable)operatorStateBackend);
        closeableRegistry.registerCloseable((Closeable)keyedStateBackend);
        Whitebox.setInternalState((Object)operator, (String)"operatorStateBackend", (Object)operatorStateBackend);
        Whitebox.setInternalState((Object)operator, (String)"keyedStateBackend", (Object)keyedStateBackend);
        try {
            operator.snapshotState(42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation(), (CheckpointStreamFactory)new MemCheckpointStreamFactory(Integer.MAX_VALUE));
            Assert.fail((String)"Exception expected.");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)failingException, (Object)e.getCause());
        }
        ((StateSnapshotContextSynchronousImpl)Mockito.verify((Object)context)).close();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult)).cancel();
        ((RunnableFuture)Mockito.verify((Object)futureKeyedStateHandle)).cancel(Matchers.anyBoolean());
        ((RunnableFuture)Mockito.verify((Object)futureOperatorStateHandle)).cancel(Matchers.anyBoolean());
        ((RunnableFuture)Mockito.verify((Object)futureKeyedStateHandle)).cancel(Matchers.anyBoolean());
        operator.close();
        operator.dispose();
        ((OperatorStateBackend)Mockito.verify((Object)operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify((Object)keyedStateBackend)).close();
        ((OperatorStateBackend)Mockito.verify((Object)operatorStateBackend)).dispose();
        ((AbstractKeyedStateBackend)Mockito.verify((Object)keyedStateBackend)).dispose();
    }

    private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
        List streamRecords = testHarness.extractOutputStreamRecords();
        ArrayList<Object> result = new ArrayList<Object>();
        for (StreamRecord in : streamRecords) {
            if (!(in instanceof StreamRecord)) continue;
            result.add(in.getValue());
        }
        testHarness.getOutput().clear();
        return result;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism))) {
            result = rand.nextInt();
        }
        return result;
    }

    private static class TestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private transient InternalTimerService<VoidNamespace> timerService;
        private final ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor("state", (TypeSerializer)StringSerializer.INSTANCE);

        private TestOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            String[] command = ((String)((Tuple2)element.getValue()).f1).split(":");
            switch (command[0]) {
                case "SET_STATE": {
                    ((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).update((Object)command[1]);
                    break;
                }
                case "DELETE_STATE": {
                    ((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).clear();
                    break;
                }
                case "SET_EVENT_TIME_TIMER": {
                    this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                    break;
                }
                case "SET_PROC_TIME_TIMER": {
                    this.timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                    break;
                }
                case "EMIT_STATE": {
                    String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
                    this.output.collect((Object)new StreamRecord((Object)("ON_ELEMENT:" + ((Tuple2)element.getValue()).f0 + ":" + stateValue)));
                    break;
                }
                default: {
                    throw new IllegalArgumentException();
                }
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_EVENT_TIME:" + stateValue)));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_PROC_TIME:" + stateValue)));
        }
    }

    private static class TestKeySelector
    implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1L;

        private TestKeySelector() {
        }

        public Integer getKey(Tuple2<Integer, String> value) throws Exception {
            return (Integer)value.f0;
        }
    }
}

