package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/util/MockContext.class */
public class MockContext<IN, OUT> {
    private List<OUT> outputs;
    private MockOutput<OUT> output;

    public MockContext(Collection<IN> collection) {
        if (collection.isEmpty()) {
            throw new RuntimeException("Inputs must not be empty");
        }
        this.outputs = new ArrayList();
        this.output = new MockOutput<>(this.outputs);
    }

    public List<OUT> getOutputs() {
        return this.outputs;
    }

    public Output<StreamRecord<OUT>> getOutput() {
        return this.output;
    }

    public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, List<IN> list) {
        return createAndExecuteForKeyedStream(oneInputStreamOperator, list, null, null);
    }

    public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, List<IN> list, KeySelector<IN, KEY> keySelector, TypeInformation<KEY> typeInformation) {
        MockContext mockContext = new MockContext(list);
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        if (keySelector != null && typeInformation != null) {
            streamConfig.setStateKeySerializer(typeInformation.createSerializer(new ExecutionConfig()));
            streamConfig.setStatePartitioner(keySelector);
        }
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        Object obj = new Object();
        oneInputStreamOperator.setup(createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj), streamConfig, mockContext.output);
        try {
            try {
                oneInputStreamOperator.open();
                StreamRecord streamRecord = new StreamRecord((Object) null);
                Iterator<IN> it = list.iterator();
                while (it.hasNext()) {
                    streamRecord = streamRecord.replace(it.next());
                    synchronized (obj) {
                        oneInputStreamOperator.setKeyContextElement(streamRecord);
                        oneInputStreamOperator.processElement(streamRecord);
                    }
                }
                oneInputStreamOperator.close();
                newSingleThreadScheduledExecutor.shutdownNow();
                return mockContext.getOutputs();
            } catch (Exception e) {
                throw new RuntimeException("Cannot invoke operator.", e);
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService scheduledExecutorService, final Object obj) {
        StreamTask<?, ?> streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when(streamTask.getName()).thenReturn("Test task name");
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(new ExecutionConfig());
        Mockito.when(streamTask.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3145728L, new MockInputSplitProvider(), 1024));
        Mockito.when(streamTask.getCheckpointLock()).thenReturn(obj);
        ((StreamTask) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.util.MockContext.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m40answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long l = (Long) invocationOnMock.getArguments()[0];
                final Triggerable triggerable = (Triggerable) invocationOnMock.getArguments()[1];
                scheduledExecutorService.schedule(new Callable<Object>() { // from class: org.apache.flink.streaming.util.MockContext.1.1
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        synchronized (obj) {
                            triggerable.trigger(l.longValue());
                        }
                        return null;
                    }
                }, l.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(streamTask)).registerTimer(Matchers.anyLong(), (Triggerable) Matchers.any(Triggerable.class));
        Mockito.when(streamTask.getStateBackend()).thenReturn(MemoryStateBackend.defaultInstance());
        return streamTask;
    }
}
