package org.apache.flink.streaming.runtime.tasks;

import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.class */
public class StreamTaskTestHarness<OUT> {
    public static final int DEFAULT_MEMORY_MANAGER_SIZE = 1048576;
    public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
    private final Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory;
    public long memorySize;
    public int bufferSize;
    protected StreamMockEnvironment mockEnv;
    protected ExecutionConfig executionConfig;
    public Configuration jobConfig;
    public Configuration taskConfig;
    protected StreamConfig streamConfig;
    protected TestTaskStateManager taskStateManager;
    private TypeSerializer<OUT> outputSerializer;
    private TypeSerializer<StreamElement> outputStreamRecordSerializer;
    private LinkedBlockingQueue<Object> outputList;
    protected StreamTaskTestHarness<OUT>.TaskThread taskThread;
    protected int numInputGates;
    protected int numInputChannelsPerGate;
    private boolean setupCalled;
    protected StreamTestSingleInputGate[] inputGates;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness$TaskThread.class */
    public class TaskThread extends Thread {
        private final Supplier<? extends StreamTask<OUT, ?>> taskFactory;
        private volatile StreamTask<OUT, ?> task;
        private volatile Throwable error;

        TaskThread(Supplier<? extends StreamTask<OUT, ?>> supplier) {
            super("Task Thread");
            this.taskFactory = supplier;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.task = this.taskFactory.get();
            try {
                this.task.invoke();
                StreamTaskTestHarness.this.shutdownIOManager();
                StreamTaskTestHarness.this.shutdownMemoryManager();
            } catch (Throwable th) {
                this.error = th;
            }
        }

        public Throwable getError() {
            return this.error;
        }
    }

    public StreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> function, TypeInformation<OUT> typeInformation) {
        this(function, typeInformation, TestLocalRecoveryConfig.disabled());
    }

    public StreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> function, TypeInformation<OUT> typeInformation, File file) {
        this(function, typeInformation, new LocalRecoveryConfig(true, new LocalRecoveryDirectoryProviderImpl(file, new JobID(), new JobVertexID(), 0)));
    }

    public StreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> function, TypeInformation<OUT> typeInformation, LocalRecoveryConfig localRecoveryConfig) {
        this.memorySize = 0L;
        this.bufferSize = 0;
        this.setupCalled = false;
        this.taskFactory = (Function) Preconditions.checkNotNull(function);
        this.memorySize = 1048576L;
        this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
        this.jobConfig = new Configuration();
        this.taskConfig = new Configuration();
        this.executionConfig = new ExecutionConfig();
        this.streamConfig = new StreamConfig(this.taskConfig);
        this.outputSerializer = typeInformation.createSerializer(this.executionConfig);
        this.outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
        this.taskStateManager = new TestTaskStateManager(localRecoveryConfig);
    }

    public StreamMockEnvironment getEnvironment() {
        return this.mockEnv;
    }

    public TimerService getTimerService() {
        return ((TaskThread) this.taskThread).task.getTimerService();
    }

    protected void initializeInputs() throws IOException, InterruptedException {
    }

    public TestTaskStateManager getTaskStateManager() {
        return this.taskStateManager;
    }

    public void setTaskStateSnapshot(long j, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateManager.setReportedCheckpointId(j);
        this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(Long.valueOf(j), taskStateSnapshot));
    }

    private void initializeOutput() {
        this.outputList = new LinkedBlockingQueue<>();
        this.mockEnv.addOutput(this.outputList, this.outputStreamRecordSerializer);
    }

    public void setupOutputForSingletonOperatorChain() {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(0);
        this.streamConfig.setOperatorID(new OperatorID(4711L, 123L));
        AbstractStreamOperator<OUT> abstractStreamOperator = new AbstractStreamOperator<OUT>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.1
            private static final long serialVersionUID = 1;
        };
        LinkedList linkedList = new LinkedList();
        linkedList.add(new StreamEdge(new StreamNode(0, "group", (String) null, abstractStreamOperator, "source dummy", new LinkedList(), SourceStreamTask.class), new StreamNode(1, "group", (String) null, abstractStreamOperator, "target dummy", new LinkedList(), SourceStreamTask.class), 0, new LinkedList(), new BroadcastPartitioner(), (OutputTag) null));
        this.streamConfig.setOutEdgesInOrder(linkedList);
        this.streamConfig.setNonChainedOutputs(linkedList);
    }

    public StreamMockEnvironment createEnvironment() {
        return new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, this.taskStateManager);
    }

    public Thread invoke() throws Exception {
        return invoke(createEnvironment());
    }

    public Thread invoke(StreamMockEnvironment streamMockEnvironment) throws Exception {
        Preconditions.checkState(this.mockEnv == null);
        Preconditions.checkState(this.taskThread == null);
        this.mockEnv = (StreamMockEnvironment) Preconditions.checkNotNull(streamMockEnvironment);
        initializeInputs();
        initializeOutput();
        this.taskThread = new TaskThread(() -> {
            return this.taskFactory.apply(streamMockEnvironment);
        });
        this.taskThread.start();
        while (((TaskThread) this.taskThread).task == null) {
            Thread.sleep(10L);
        }
        return this.taskThread;
    }

    public void waitForTaskCompletion() throws Exception {
        waitForTaskCompletion(Long.MAX_VALUE);
    }

    public void waitForTaskCompletion(long j) throws Exception {
        waitForTaskCompletion(j, false);
    }

    public void waitForTaskCompletion(long j, boolean z) throws Exception {
        Preconditions.checkState(this.taskThread != null, "Task thread was not started.");
        this.taskThread.join(j);
        if (this.taskThread.getError() != null) {
            if (!z || !ExceptionUtils.findThrowable(this.taskThread.getError(), CancelTaskException.class).isPresent()) {
                throw new Exception("error in task", this.taskThread.getError());
            }
        }
    }

    public void waitForTaskRunning() throws Exception {
        Preconditions.checkState(this.taskThread != null, "Task thread was not started.");
        StreamTask streamTask = ((TaskThread) this.taskThread).task;
        while (!streamTask.isRunning()) {
            Thread.sleep(10L);
            if (!this.taskThread.isAlive()) {
                if (this.taskThread.getError() == null) {
                    throw new Exception("Task Thread unexpectedly shut down.");
                }
                throw new Exception("Task Thread failed due to an error.", this.taskThread.getError());
            }
        }
    }

    /* renamed from: getTask */
    public StreamTask<OUT, ?> mo102getTask() {
        return ((TaskThread) this.taskThread).task;
    }

    public LinkedBlockingQueue<Object> getOutput() {
        return this.outputList;
    }

    public StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownIOManager() throws Exception {
        this.mockEnv.getIOManager().close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownMemoryManager() throws Exception {
        MemoryManager memoryManager;
        if (this.memorySize <= 0 || (memoryManager = this.mockEnv.getMemoryManager()) == null) {
            return;
        }
        Assert.assertTrue("Memory Manager managed memory was not completely freed.", memoryManager.verifyEmpty());
        memoryManager.shutdown();
    }

    public void processElement(Object obj) {
        this.inputGates[0].sendElement(obj, 0);
    }

    public void processElement(Object obj, int i, int i2) {
        this.inputGates[i].sendElement(obj, i2);
    }

    public void processEvent(AbstractEvent abstractEvent) {
        this.inputGates[0].sendEvent(abstractEvent, 0);
    }

    public void processEvent(AbstractEvent abstractEvent, int i, int i2) {
        this.inputGates[i].sendEvent(abstractEvent, i2);
    }

    public void waitForInputProcessing() throws Exception {
        boolean z;
        do {
            Throwable error = this.taskThread.getError();
            if (error != null) {
                throw new Exception("Exception in the task thread", error);
            }
            z = true;
            for (int i = 0; i < this.numInputGates; i++) {
                if (!this.inputGates[i].allQueuesEmpty()) {
                    z = false;
                }
            }
        } while (!z);
        while (true) {
            Thread.State state = this.taskThread.getState();
            if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED || state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
                return;
            } else {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public void endInput() {
        for (int i = 0; i < this.numInputGates; i++) {
            this.inputGates[i].endInput();
        }
    }

    public void endInput(int i, int i2) {
        this.inputGates[i].sendEvent(EndOfPartitionEvent.INSTANCE, i2);
    }

    public StreamConfigChainer setupOperatorChain(OperatorID operatorID, StreamOperator<?> streamOperator) {
        return setupOperatorChain(operatorID, (StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator));
    }

    public StreamConfigChainer setupOperatorChain(OperatorID operatorID, StreamOperatorFactory<?> streamOperatorFactory) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        StreamConfig streamConfig = getStreamConfig();
        streamConfig.setStreamOperatorFactory(streamOperatorFactory);
        return new StreamConfigChainer(operatorID, streamConfig);
    }
}
