package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.Thread;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.class */
public class StreamTaskTestHarness<OUT> {
    public static final int DEFAULT_MEMORY_MANAGER_SIZE = 1048576;
    public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
    private final Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory;
    public long memorySize;
    public int bufferSize;
    protected StreamMockEnvironment mockEnv;
    private StreamTask<OUT, ?> task;
    private TypeSerializer<OUT> outputSerializer;
    private TypeSerializer<StreamElement> outputStreamRecordSerializer;
    private LinkedBlockingQueue<Object> outputList;
    protected StreamTaskTestHarness<OUT>.TaskThread taskThread;
    protected int numInputGates;
    protected int numInputChannelsPerGate;
    protected StreamTestSingleInputGate[] inputGates;
    private boolean setupCalled = false;
    public Configuration jobConfig = new Configuration();
    public Configuration taskConfig = new Configuration();
    protected ExecutionConfig executionConfig = new ExecutionConfig();
    protected StreamConfig streamConfig = new StreamConfig(this.taskConfig);
    protected TestTaskStateManager taskStateManager = new TestTaskStateManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness$TaskThread.class */
    public class TaskThread extends Thread {
        private final AbstractInvokable task;
        private volatile Throwable error;

        TaskThread(AbstractInvokable abstractInvokable) {
            super("Task Thread");
            this.task = abstractInvokable;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.task.invoke();
                StreamTaskTestHarness.this.shutdownIOManager();
                StreamTaskTestHarness.this.shutdownMemoryManager();
            } catch (Throwable th) {
                this.error = th;
            }
        }

        public Throwable getError() {
            return this.error;
        }
    }

    public StreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> function, TypeInformation<OUT> typeInformation) {
        this.memorySize = 0L;
        this.bufferSize = 0;
        this.taskFactory = (Function) Preconditions.checkNotNull(function);
        this.memorySize = 1048576L;
        this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
        this.outputSerializer = typeInformation.createSerializer(this.executionConfig);
        this.outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
    }

    public ProcessingTimeService getProcessingTimeService() {
        return this.task.getProcessingTimeService();
    }

    protected void initializeInputs() throws IOException, InterruptedException {
    }

    public TestTaskStateManager getTaskStateManager() {
        return this.taskStateManager;
    }

    public void setTaskStateSnapshot(long j, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateManager.setReportedCheckpointId(j);
        this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(Long.valueOf(j), taskStateSnapshot));
    }

    private void initializeOutput() {
        this.outputList = new LinkedBlockingQueue<>();
        this.mockEnv.addOutput(this.outputList, this.outputStreamRecordSerializer);
    }

    public void setupOutputForSingletonOperatorChain() {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setBufferTimeout(0L);
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(0);
        this.streamConfig.setOperatorID(new OperatorID(4711L, 123L));
        AbstractStreamOperator<OUT> abstractStreamOperator = new AbstractStreamOperator<OUT>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.1
            private static final long serialVersionUID = 1;
        };
        LinkedList linkedList = new LinkedList();
        linkedList.add(new StreamEdge(new StreamNode((StreamExecutionEnvironment) null, 0, "group", (String) null, abstractStreamOperator, "source dummy", new LinkedList(), SourceStreamTask.class), new StreamNode((StreamExecutionEnvironment) null, 1, "group", (String) null, abstractStreamOperator, "target dummy", new LinkedList(), SourceStreamTask.class), 0, new LinkedList(), new BroadcastPartitioner(), (OutputTag) null));
        this.streamConfig.setOutEdgesInOrder(linkedList);
        this.streamConfig.setNonChainedOutputs(linkedList);
    }

    public StreamMockEnvironment createEnvironment() {
        return new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, this.taskStateManager);
    }

    public void invoke() throws Exception {
        invoke(createEnvironment());
    }

    public void invoke(StreamMockEnvironment streamMockEnvironment) throws Exception {
        this.mockEnv = (StreamMockEnvironment) Preconditions.checkNotNull(streamMockEnvironment);
        initializeInputs();
        initializeOutput();
        this.task = this.taskFactory.apply(streamMockEnvironment);
        this.taskThread = new TaskThread(this.task);
        this.taskThread.start();
    }

    public void waitForTaskCompletion() throws Exception {
        waitForTaskCompletion(Long.MAX_VALUE);
    }

    public void waitForTaskCompletion(long j) throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        this.taskThread.join(j);
        if (this.taskThread.getError() != null) {
            throw new Exception("error in task", this.taskThread.getError());
        }
    }

    public void waitForTaskRunning() throws Exception {
        waitForTaskRunning(Long.MAX_VALUE);
    }

    public void waitForTaskRunning(long j) throws Exception {
        if (this.taskThread == null) {
            throw new IllegalStateException("Task thread was not started.");
        }
        if (!(((TaskThread) this.taskThread).task instanceof StreamTask)) {
            throw new IllegalStateException("Not a StreamTask");
        }
        StreamTask streamTask = ((TaskThread) this.taskThread).task;
        while (!streamTask.isRunning()) {
            Thread.sleep(10L);
            if (!this.taskThread.isAlive()) {
                if (this.taskThread.getError() == null) {
                    throw new Exception("Task Thread unexpectedly shut down.");
                }
                throw new Exception("Task Thread failed due to an error.", this.taskThread.getError());
            }
        }
    }

    /* renamed from: getTask */
    public StreamTask<OUT, ?> mo103getTask() {
        return this.task;
    }

    public LinkedBlockingQueue<Object> getOutput() {
        return this.outputList;
    }

    public StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownIOManager() throws Exception {
        this.mockEnv.getIOManager().shutdown();
        Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownMemoryManager() throws Exception {
        MemoryManager memoryManager;
        if (this.memorySize <= 0 || (memoryManager = this.mockEnv.getMemoryManager()) == null) {
            return;
        }
        Assert.assertTrue("Memory Manager managed memory was not completely freed.", memoryManager.verifyEmpty());
        memoryManager.shutdown();
    }

    public void processElement(Object obj) {
        this.inputGates[0].sendElement(obj, 0);
    }

    public void processElement(Object obj, int i, int i2) {
        this.inputGates[i].sendElement(obj, i2);
    }

    public void processEvent(AbstractEvent abstractEvent) {
        this.inputGates[0].sendEvent(abstractEvent, 0);
    }

    public void processEvent(AbstractEvent abstractEvent, int i, int i2) {
        this.inputGates[i].sendEvent(abstractEvent, i2);
    }

    public void waitForInputProcessing() throws Exception {
        boolean z;
        do {
            Throwable error = this.taskThread.getError();
            if (error != null) {
                throw new Exception("Exception in the task thread", error);
            }
            z = true;
            for (int i = 0; i < this.numInputGates; i++) {
                if (!this.inputGates[i].allQueuesEmpty()) {
                    z = false;
                }
            }
        } while (!z);
        while (true) {
            Thread.State state = this.taskThread.getState();
            if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED || state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
                return;
            } else {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public void endInput() {
        for (int i = 0; i < this.numInputGates; i++) {
            this.inputGates[i].endInput();
        }
    }

    public StreamConfigChainer setupOperatorChain(OperatorID operatorID, OneInputStreamOperator<?, ?> oneInputStreamOperator) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        return new StreamConfigChainer(operatorID, oneInputStreamOperator, getStreamConfig());
    }

    public StreamConfigChainer setupOperatorChain(OperatorID operatorID, TwoInputStreamOperator<?, ?, ?> twoInputStreamOperator) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        return new StreamConfigChainer(operatorID, twoInputStreamOperator, getStreamConfig());
    }
}
