/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemListState;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamingRuntimeContextTest {
    @Test
    public void testValueStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ValueStateDescriptor descr = new ValueStateDescriptor("name", TaskInfo.class, null);
        context.getState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testReduceingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ReduceFunction reducer = (ReduceFunction)Mockito.mock(ReduceFunction.class);
        ReducingStateDescriptor descr = new ReducingStateDescriptor("name", reducer, TaskInfo.class);
        context.getReducingState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testListStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ListStateDescriptor descr = new ListStateDescriptor("name", TaskInfo.class);
        context.getListState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testListStateReturnsEmptyListByDefault() throws Exception {
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createPlainMockOp(), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ListStateDescriptor descr = new ListStateDescriptor("name", String.class);
        ListState state = context.getListState(descr);
        Iterable value = (Iterable)state.get();
        Assert.assertNotNull((Object)value);
        Assert.assertFalse((boolean)value.iterator().hasNext());
    }

    private static AbstractStreamOperator<?> createDescriptorCapturingMockOp(final AtomicReference<Object> ref, ExecutionConfig config) throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        Mockito.when((Object)operatorMock.getPartitionedState((StateDescriptor)Mockito.any(StateDescriptor.class))).thenAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ref.set(invocationOnMock.getArguments()[0]);
                return null;
            }
        });
        return operatorMock;
    }

    private static AbstractStreamOperator<?> createPlainMockOp() throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        Mockito.when((Object)operatorMock.getPartitionedState((StateDescriptor)Mockito.any(ListStateDescriptor.class))).thenAnswer((Answer)new Answer<ListState<String>>(){

            public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                ListStateDescriptor descr = (ListStateDescriptor)invocationOnMock.getArguments()[0];
                return new MemListState((TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)VoidSerializer.INSTANCE, descr);
            }
        });
        return operatorMock;
    }

    private static Environment createMockEnvironment() {
        Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getUserClassLoader()).thenReturn((Object)StreamingRuntimeContextTest.class.getClassLoader());
        Mockito.when((Object)env.getDistributedCacheEntries()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)env.getTaskInfo()).thenReturn((Object)new TaskInfo("test task", 0, 1, 1));
        return env;
    }
}

