/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

public class StreamTaskTestHarness<OUT> {
    public static final int DEFAULT_MEMORY_MANAGER_SIZE = 0x100000;
    public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
    private final Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory;
    public long memorySize = 0L;
    public int bufferSize = 0;
    protected StreamMockEnvironment mockEnv;
    protected ExecutionConfig executionConfig;
    public Configuration jobConfig;
    public Configuration taskConfig;
    protected StreamConfig streamConfig;
    protected TestTaskStateManager taskStateManager;
    private StreamTask<OUT, ?> task;
    private TypeSerializer<OUT> outputSerializer;
    private TypeSerializer<StreamElement> outputStreamRecordSerializer;
    private LinkedBlockingQueue<Object> outputList;
    protected TaskThread taskThread;
    protected int numInputGates;
    protected int numInputChannelsPerGate;
    private boolean setupCalled = false;
    protected StreamTestSingleInputGate[] inputGates;

    public StreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, TypeInformation<OUT> outputType) {
        this.taskFactory = (Function)Preconditions.checkNotNull(taskFactory);
        this.memorySize = 0x100000L;
        this.bufferSize = 1024;
        this.jobConfig = new Configuration();
        this.taskConfig = new Configuration();
        this.executionConfig = new ExecutionConfig();
        this.streamConfig = new StreamConfig(this.taskConfig);
        this.outputSerializer = outputType.createSerializer(this.executionConfig);
        this.outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
        this.taskStateManager = new TestTaskStateManager();
    }

    public ProcessingTimeService getProcessingTimeService() {
        return this.task.getProcessingTimeService();
    }

    protected void initializeInputs() throws IOException, InterruptedException {
    }

    public TestTaskStateManager getTaskStateManager() {
        return this.taskStateManager;
    }

    public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateManager.setReportedCheckpointId(checkpointId);
        this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(checkpointId, taskStateSnapshot));
    }

    private void initializeOutput() {
        this.outputList = new LinkedBlockingQueue();
        this.mockEnv.addOutput(this.outputList, this.outputStreamRecordSerializer);
    }

    public void setupOutputForSingletonOperatorChain() {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setBufferTimeout(0L);
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(Integer.valueOf(0));
        this.streamConfig.setOperatorID(new OperatorID(4711L, 123L));
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<OUT>(){
            private static final long serialVersionUID = 1L;
        };
        LinkedList<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
        StreamNode sourceVertexDummy = new StreamNode(null, Integer.valueOf(0), "group", null, (StreamOperator)dummyOperator, "source dummy", new LinkedList(), SourceStreamTask.class);
        StreamNode targetVertexDummy = new StreamNode(null, Integer.valueOf(1), "group", null, (StreamOperator)dummyOperator, "target dummy", new LinkedList(), SourceStreamTask.class);
        outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner(), null));
        this.streamConfig.setOutEdgesInOrder(outEdgesInOrder);
        this.streamConfig.setNonChainedOutputs(outEdgesInOrder);
    }

    public StreamMockEnvironment createEnvironment() {
        return new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, (TaskStateManager)this.taskStateManager);
    }

    public void invoke() throws Exception {
        this.invoke(this.createEnvironment());
    }

    public void invoke(StreamMockEnvironment mockEnv) throws Exception {
        this.mockEnv = (StreamMockEnvironment)Preconditions.checkNotNull((Object)mockEnv);
        this.initializeInputs();
        this.initializeOutput();
        this.task = this.taskFactory.apply(mockEnv);
        this.taskThread = new TaskThread((AbstractInvokable)this.task);
        this.taskThread.start();
    }

    public void waitForTaskCompletion() throws Exception {
        this.waitForTaskCompletion(Long.MAX_VALUE);
    }

    public void waitForTaskCompletion(long timeout) throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        this.taskThread.join(timeout);
        if (this.taskThread.getError() != null) {
            throw new Exception("error in task", this.taskThread.getError());
        }
    }

    public void waitForTaskRunning() throws Exception {
        this.waitForTaskRunning(Long.MAX_VALUE);
    }

    public void waitForTaskRunning(long timeout) throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        if (this.taskThread.task instanceof StreamTask) {
            StreamTask streamTask = (StreamTask)this.taskThread.task;
            while (!streamTask.isRunning()) {
                Thread.sleep(10L);
                if (this.taskThread.isAlive()) continue;
                if (this.taskThread.getError() != null) {
                    throw new Exception("Task Thread failed due to an error.", this.taskThread.getError());
                }
                throw new Exception("Task Thread unexpectedly shut down.");
            }
        } else {
            throw new IllegalStateException("Not a StreamTask");
        }
    }

    public StreamTask<OUT, ?> getTask() {
        return this.task;
    }

    public LinkedBlockingQueue<Object> getOutput() {
        return this.outputList;
    }

    public StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    private void shutdownIOManager() throws Exception {
        this.mockEnv.getIOManager().shutdown();
        Assert.assertTrue((String)"IO Manager has not properly shut down.", (boolean)this.mockEnv.getIOManager().isProperlyShutDown());
    }

    private void shutdownMemoryManager() throws Exception {
        MemoryManager memMan;
        if (this.memorySize > 0L && (memMan = this.mockEnv.getMemoryManager()) != null) {
            Assert.assertTrue((String)"Memory Manager managed memory was not completely freed.", (boolean)memMan.verifyEmpty());
            memMan.shutdown();
        }
    }

    public void processElement(Object element) {
        this.inputGates[0].sendElement(element, 0);
    }

    public void processElement(Object element, int inputGate, int channel) {
        this.inputGates[inputGate].sendElement(element, channel);
    }

    public void processEvent(AbstractEvent event) {
        this.inputGates[0].sendEvent(event, 0);
    }

    public void processEvent(AbstractEvent event, int inputGate, int channel) {
        this.inputGates[inputGate].sendEvent(event, channel);
    }

    public void waitForInputProcessing() throws Exception {
        Thread.State state;
        boolean allEmpty;
        do {
            Throwable error;
            if ((error = this.taskThread.getError()) != null) {
                throw new Exception("Exception in the task thread", error);
            }
            allEmpty = true;
            for (int i = 0; i < this.numInputGates; ++i) {
                if (this.inputGates[i].allQueuesEmpty()) continue;
                allEmpty = false;
            }
        } while (!allEmpty);
        while ((state = this.taskThread.getState()) != Thread.State.BLOCKED && state != Thread.State.TERMINATED && state != Thread.State.WAITING && state != Thread.State.TIMED_WAITING) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void endInput() {
        for (int i = 0; i < this.numInputGates; ++i) {
            this.inputGates[i].endInput();
        }
    }

    public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator<?, ?> headOperator) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        return new StreamConfigChainer(headOperatorId, (StreamOperator<?>)headOperator, this.getStreamConfig());
    }

    public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, TwoInputStreamOperator<?, ?, ?> headOperator) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        return new StreamConfigChainer(headOperatorId, (StreamOperator<?>)headOperator, this.getStreamConfig());
    }

    private class TaskThread
    extends Thread {
        private final AbstractInvokable task;
        private volatile Throwable error;

        TaskThread(AbstractInvokable task) {
            super("Task Thread");
            this.task = task;
        }

        @Override
        public void run() {
            try {
                this.task.invoke();
                StreamTaskTestHarness.this.shutdownIOManager();
                StreamTaskTestHarness.this.shutdownMemoryManager();
            }
            catch (Throwable t) {
                this.error = t;
            }
        }

        public Throwable getError() {
            return this.error;
        }
    }
}

