/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class MockContext<IN, OUT> {
    private List<OUT> outputs;
    private MockOutput<OUT> output;

    public MockContext(Collection<IN> inputs) {
        if (inputs.isEmpty()) {
            throw new RuntimeException("Inputs must not be empty");
        }
        this.outputs = new ArrayList<OUT>();
        this.output = new MockOutput<OUT>(this.outputs);
    }

    public List<OUT> getOutputs() {
        return this.outputs;
    }

    public Output<StreamRecord<OUT>> getOutput() {
        return this.output;
    }

    public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
        return MockContext.createAndExecuteForKeyedStream(operator, inputs, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs, KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) {
        MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
        StreamConfig config = new StreamConfig(new Configuration());
        if (keySelector != null && keyType != null) {
            config.setStateKeySerializer(keyType.createSerializer(new ExecutionConfig()));
            config.setStatePartitioner(0, keySelector);
        }
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        Object lock = new Object();
        StreamTask<?, ?> mockTask = MockContext.createMockTaskWithTimer(timerService, lock);
        operator.setup(mockTask, config, mockContext.output);
        try {
            operator.open();
            StreamRecord record = new StreamRecord(null);
            for (IN in : inputs) {
                record = record.replace(in);
                Object object = lock;
                synchronized (object) {
                    operator.setKeyContextElement1(record);
                    operator.processElement(record);
                }
            }
            operator.close();
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot invoke operator.", e);
        }
        finally {
            timerService.shutdownNow();
        }
        return mockContext.getOutputs();
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService timerService, final Object lock) {
        StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)task.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when((Object)task.getName()).thenReturn((Object)"Test task name");
        Mockito.when((Object)task.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        Mockito.when((Object)task.getEnvironment()).thenReturn((Object)new MockEnvironment("MockTask", 0x300000L, new MockInputSplitProvider(), 1024));
        Mockito.when((Object)task.getCheckpointLock()).thenReturn(lock);
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long timestamp = (Long)invocationOnMock.getArguments()[0];
                final Triggerable target = (Triggerable)invocationOnMock.getArguments()[1];
                timerService.schedule(new Callable<Object>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Object call() throws Exception {
                        Object object = lock;
                        synchronized (object) {
                            target.trigger(timestamp.longValue());
                        }
                        return null;
                    }
                }, timestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when((Object)task)).registerTimer(Matchers.anyLong(), (Triggerable)Matchers.any(Triggerable.class));
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<AbstractStateBackend>(){

                public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String operatorIdentifier = (String)invocationOnMock.getArguments()[0];
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[1];
                    MemoryStateBackend backend = MemoryStateBackend.create();
                    backend.initializeForJob((Environment)new DummyEnvironment("dummty", 1, 0), operatorIdentifier, keySerializer);
                    return backend;
                }
            }).when((Object)task)).createStateBackend((String)Matchers.any(String.class), (TypeSerializer)Matchers.any(TypeSerializer.class));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return task;
    }
}

