/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;

public class StreamTaskTestHarness<OUT> {
    public static final int DEFAULT_MEMORY_MANAGER_SIZE = 0x100000;
    public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
    public long memorySize = 0L;
    public int bufferSize = 0;
    protected StreamMockEnvironment mockEnv;
    protected ExecutionConfig executionConfig;
    public Configuration jobConfig;
    public Configuration taskConfig;
    protected StreamConfig streamConfig;
    private AbstractInvokable task;
    private TypeSerializer<OUT> outputSerializer;
    private TypeSerializer<StreamElement> outputStreamRecordSerializer;
    private ConcurrentLinkedQueue<Object> outputList;
    protected TaskThread taskThread;
    protected int numInputGates;
    protected int numInputChannelsPerGate;
    protected StreamTestSingleInputGate[] inputGates;

    public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
        this.task = task;
        this.memorySize = 0x100000L;
        this.bufferSize = 1024;
        this.jobConfig = new Configuration();
        this.taskConfig = new Configuration();
        this.executionConfig = new ExecutionConfig();
        this.streamConfig = new StreamConfig(this.taskConfig);
        this.streamConfig.setChainStart();
        this.streamConfig.setBufferTimeout(0L);
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.outputSerializer = outputType.createSerializer(this.executionConfig);
        this.outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer(this.outputSerializer);
    }

    public long getCurrentProcessingTime() {
        if (!(this.task instanceof StreamTask)) {
            System.currentTimeMillis();
        }
        return ((StreamTask)this.task).getCurrentProcessingTime();
    }

    protected void initializeInputs() throws IOException, InterruptedException {
    }

    private void initializeOutput() {
        this.outputList = new ConcurrentLinkedQueue();
        this.mockEnv.addOutput(this.outputList, this.outputStreamRecordSerializer);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<OUT>(){
            private static final long serialVersionUID = 1L;
        };
        LinkedList<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
        StreamNode sourceVertexDummy = new StreamNode(null, Integer.valueOf(0), "group", (StreamOperator)dummyOperator, "source dummy", new LinkedList(), SourceStreamTask.class);
        StreamNode targetVertexDummy = new StreamNode(null, Integer.valueOf(1), "group", (StreamOperator)dummyOperator, "target dummy", new LinkedList(), SourceStreamTask.class);
        outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner()));
        this.streamConfig.setOutEdgesInOrder(outEdgesInOrder);
        this.streamConfig.setNonChainedOutputs(outEdgesInOrder);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(Integer.valueOf(0));
    }

    public StreamMockEnvironment createEnvironment() {
        return new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize);
    }

    public void invoke() throws Exception {
        this.invoke(this.createEnvironment());
    }

    public void invoke(StreamMockEnvironment mockEnv) throws Exception {
        this.mockEnv = mockEnv;
        this.task.setEnvironment((Environment)mockEnv);
        this.initializeInputs();
        this.initializeOutput();
        this.taskThread = new TaskThread(this.task);
        this.taskThread.start();
    }

    public void waitForTaskCompletion() throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        this.taskThread.join();
        if (this.taskThread.getError() != null) {
            throw new Exception("error in task", this.taskThread.getError());
        }
    }

    public void waitForTaskRunning() throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        if (this.taskThread.task instanceof StreamTask) {
            StreamTask streamTask = (StreamTask)this.taskThread.task;
            while (!streamTask.isRunning()) {
                Thread.sleep(10L);
                if (this.taskThread.isAlive()) continue;
                if (this.taskThread.getError() != null) {
                    throw new Exception("Task Thread failed due to an error.", this.taskThread.getError());
                }
                throw new Exception("Task Thread unexpectedly shut down.");
            }
        } else {
            throw new IllegalStateException("Not a StreamTask");
        }
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    private void shutdownIOManager() throws Exception {
        this.mockEnv.getIOManager().shutdown();
        Assert.assertTrue((String)"IO Manager has not properly shut down.", (boolean)this.mockEnv.getIOManager().isProperlyShutDown());
    }

    private void shutdownMemoryManager() throws Exception {
        MemoryManager memMan;
        if (this.memorySize > 0L && (memMan = this.mockEnv.getMemoryManager()) != null) {
            Assert.assertTrue((String)"Memory Manager managed memory was not completely freed.", (boolean)memMan.verifyEmpty());
            memMan.shutdown();
        }
    }

    public void processElement(Object element) {
        this.inputGates[0].sendElement(element, 0);
    }

    public void processElement(Object element, int inputGate, int channel) {
        this.inputGates[inputGate].sendElement(element, channel);
    }

    public void processEvent(AbstractEvent event) {
        this.inputGates[0].sendEvent(event, 0);
    }

    public void processEvent(AbstractEvent event, int inputGate, int channel) {
        this.inputGates[inputGate].sendEvent(event, channel);
    }

    public void waitForInputProcessing() throws Exception {
        Thread.State state;
        boolean allEmpty;
        do {
            Throwable error;
            if ((error = this.taskThread.getError()) != null) {
                throw new Exception("Exception in the task thread", error);
            }
            allEmpty = true;
            for (int i = 0; i < this.numInputGates; ++i) {
                if (this.inputGates[i].allQueuesEmpty()) continue;
                allEmpty = false;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (!allEmpty);
        while ((state = this.taskThread.getState()) != Thread.State.BLOCKED && state != Thread.State.TERMINATED && state != Thread.State.WAITING && state != Thread.State.TIMED_WAITING) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void endInput() {
        for (int i = 0; i < this.numInputGates; ++i) {
            this.inputGates[i].endInput();
        }
    }

    private class TaskThread
    extends Thread {
        private final AbstractInvokable task;
        private volatile Throwable error;

        TaskThread(AbstractInvokable task) {
            super("Task Thread");
            this.task = task;
        }

        @Override
        public void run() {
            try {
                this.task.invoke();
                StreamTaskTestHarness.this.shutdownIOManager();
                StreamTaskTestHarness.this.shutdownMemoryManager();
            }
            catch (Throwable t) {
                this.error = t;
            }
        }

        public Throwable getError() {
            return this.error;
        }
    }
}

