/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

public class StreamTaskMailboxTestHarnessBuilder<OUT> {
    protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
    protected final TypeSerializer<OUT> outputSerializer;
    protected final ExecutionConfig executionConfig = new ExecutionConfig();
    protected long memorySize = 0x100000L;
    protected int bufferSize = 1024;
    protected Configuration jobConfig = new Configuration();
    protected Configuration taskConfig = new Configuration();
    protected StreamConfig streamConfig = new StreamConfig(this.taskConfig);
    protected LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
    @Nullable
    protected StreamTestSingleInputGate[] inputGates;
    protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
    protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
    protected final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList();
    protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList();
    private boolean setupCalled = false;

    public StreamTaskMailboxTestHarnessBuilder(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, TypeInformation<OUT> outputType) {
        this.taskFactory = (FunctionWithException)Preconditions.checkNotNull(taskFactory);
        this.outputSerializer = outputType.createSerializer(this.executionConfig);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
        return this.addInput(inputType, 1);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
        return this.addInput(inputType, inputChannels, null);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels, @Nullable KeySelector<?, ?> keySelector) {
        this.streamConfig.setStatePartitioner(this.inputSerializers.size(), keySelector);
        this.inputSerializers.add(inputType.createSerializer(this.executionConfig));
        this.inputChannelsPerGate.add(inputChannels);
        return this;
    }

    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
        TestTaskStateManager taskStateManager = new TestTaskStateManager(this.localRecoveryConfig);
        if (this.taskStateSnapshots != null) {
            taskStateManager.setReportedCheckpointId(this.taskStateSnapshots.keySet().iterator().next().longValue());
            taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(this.taskStateSnapshots);
        }
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, (TaskStateManager)taskStateManager);
        streamMockEnvironment.setCheckpointResponder(taskStateManager.getCheckpointResponder());
        this.initializeInputs(streamMockEnvironment);
        Preconditions.checkState((this.inputGates != null ? 1 : 0) != 0, (Object)"InputGates hasn't been initialised");
        StreamElementSerializer outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
        ArrayDeque<Object> outputList = new ArrayDeque<Object>();
        streamMockEnvironment.addOutput((Collection<Object>)outputList, outputStreamRecordSerializer);
        streamMockEnvironment.setTaskMetricGroup(this.taskMetricGroup);
        StreamTask task = (StreamTask)this.taskFactory.apply((Object)streamMockEnvironment);
        task.beforeInvoke();
        return new StreamTaskMailboxTestHarness(task, outputList, this.inputGates, streamMockEnvironment);
    }

    protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
        this.inputGates = new StreamTestSingleInputGate[this.inputSerializers.size()];
        LinkedList<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
        AbstractStreamOperator<Object> dummyOperator = new AbstractStreamOperator<Object>(){
            private static final long serialVersionUID = 1L;
        };
        StreamNode sourceVertexDummy = new StreamNode(Integer.valueOf(0), "default group", null, (StreamOperator)dummyOperator, "source dummy", new LinkedList(), SourceStreamTask.class);
        StreamNode targetVertexDummy = new StreamNode(Integer.valueOf(1), "default group", null, (StreamOperator)dummyOperator, "target dummy", new LinkedList(), SourceStreamTask.class);
        for (int i = 0; i < this.inputSerializers.size(); ++i) {
            TypeSerializer<?> inputSerializer = this.inputSerializers.get(i);
            this.inputGates[i] = new StreamTestSingleInputGate(this.inputChannelsPerGate.get(i), i, inputSerializer, this.bufferSize);
            StreamEdge streamEdge = new StreamEdge(sourceVertexDummy, targetVertexDummy, i + 1, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner(), null);
            inPhysicalEdges.add(streamEdge);
            streamMockEnvironment.addInputGate((IndexedInputGate)this.inputGates[i].getInputGate());
        }
        this.streamConfig.setInPhysicalEdges(inPhysicalEdges);
        this.streamConfig.setNumberOfInputs(this.inputGates.length);
        this.streamConfig.setTypeSerializersIn(this.inputSerializers.toArray(new TypeSerializer[this.inputSerializers.size()]));
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator) {
        return this.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of(operator), new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator, OperatorID operatorID) {
        return this.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of(operator), operatorID);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> factory) {
        return this.setupOutputForSingletonOperatorChain(factory, new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> factory, OperatorID operatorID) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(Integer.valueOf(0));
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<OUT>(){
            private static final long serialVersionUID = 1L;
        };
        LinkedList<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
        StreamNode sourceVertexDummy = new StreamNode(Integer.valueOf(0), "group", null, (StreamOperator)dummyOperator, "source dummy", new LinkedList(), SourceStreamTask.class);
        StreamNode targetVertexDummy = new StreamNode(Integer.valueOf(1), "group", null, (StreamOperator)dummyOperator, "target dummy", new LinkedList(), SourceStreamTask.class);
        outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner(), null));
        this.streamConfig.setOutEdgesInOrder(outEdgesInOrder);
        this.streamConfig.setNonChainedOutputs(outEdgesInOrder);
        this.streamConfig.setStreamOperatorFactory(factory);
        this.streamConfig.setOperatorID(operatorID);
        return this;
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperator<?> headOperator) {
        return this.setupOperatorChain(new OperatorID(), headOperator);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperator<?> headOperator) {
        return this.setupOperatorChain(headOperatorId, (StreamOperatorFactory<?>)SimpleOperatorFactory.of(headOperator));
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperatorFactory<?> headOperatorFactory) {
        return this.setupOperatorChain(new OperatorID(), headOperatorFactory);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperatorFactory<?> headOperatorFactory) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setStreamOperatorFactory(headOperatorFactory);
        return new StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>>(headOperatorId, this.streamConfig, this);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskMetricGroup(TaskMetricGroup taskMetricGroup) {
        this.taskMetricGroup = taskMetricGroup;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setKeyType(TypeInformation<?> keyType) {
        this.streamConfig.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskStateSnapshot(long checkpointId, TaskStateSnapshot snapshot) {
        this.taskStateSnapshots = Collections.singletonMap(checkpointId, snapshot);
        return this;
    }
}

