package org.apache.flink.streaming.runtime.io.benchmark;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.class */
public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
    private static final InetAddress LOCAL_ADDRESS;
    protected NetworkEnvironment senderEnv;
    protected NetworkEnvironment receiverEnv;
    protected IOManager ioManager;
    protected int channels;
    protected ResultPartitionID[] partitionIds;
    protected final JobID jobId = new JobID();
    protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
    protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
    protected boolean localMode = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment$NoOpTaskActions.class */
    public static final class NoOpTaskActions implements TaskActions {
        private NoOpTaskActions() {
        }

        public void triggerPartitionProducerStateCheck(JobID jobID, IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) {
        }

        public void failExternally(Throwable th) {
        }
    }

    public void setUp(int i, int i2, boolean z, int i3, int i4) throws Exception {
        setUp(i, i2, z, i3, i4, new Configuration());
    }

    public void setUp(int i, int i2, boolean z, int i3, int i4, Configuration configuration) throws Exception {
        this.localMode = z;
        this.channels = i2;
        this.partitionIds = new ResultPartitionID[i];
        if (i3 == -1) {
            i3 = Math.max(2048, i * i2 * 4);
        }
        if (i4 == -1) {
            i4 = Math.max(2048, i * i2 * 4);
        }
        this.ioManager = new IOManagerAsync();
        this.senderEnv = createNettyNetworkEnvironment(i3, configuration);
        this.senderEnv.start();
        if (z && i3 == i4) {
            this.receiverEnv = this.senderEnv;
        } else {
            this.receiverEnv = createNettyNetworkEnvironment(i4, configuration);
            this.receiverEnv.start();
        }
        generatePartitionIds();
    }

    public void tearDown() {
        NetworkEnvironment networkEnvironment = this.senderEnv;
        networkEnvironment.getClass();
        ExceptionUtils.suppressExceptions(networkEnvironment::shutdown);
        NetworkEnvironment networkEnvironment2 = this.receiverEnv;
        networkEnvironment2.getClass();
        ExceptionUtils.suppressExceptions(networkEnvironment2::shutdown);
        IOManager iOManager = this.ioManager;
        iOManager.getClass();
        ExceptionUtils.suppressExceptions(iOManager::shutdown);
    }

    public SerializingLongReceiver createReceiver() throws Exception {
        SerializingLongReceiver serializingLongReceiver = new SerializingLongReceiver(createInputGate(this.jobId, this.dataSetID, this.executionAttemptID, new TaskManagerLocation(ResourceID.generate(), LOCAL_ADDRESS, this.senderEnv.getConnectionManager().getDataPort()), this.receiverEnv, this.channels), this.channels * this.partitionIds.length);
        serializingLongReceiver.start();
        return serializingLongReceiver;
    }

    public StreamRecordWriter<T> createRecordWriter(int i, long j) throws Exception {
        return new StreamRecordWriter<>(createResultPartition(this.jobId, this.partitionIds[i], this.senderEnv, this.channels), new RoundRobinChannelSelector(), j);
    }

    private void generatePartitionIds() throws Exception {
        for (int i = 0; i < this.partitionIds.length; i++) {
            this.partitionIds[i] = new ResultPartitionID();
        }
    }

    private NetworkEnvironment createNettyNetworkEnvironment(int i, Configuration configuration) throws Exception {
        int integer = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
        int integer2 = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        if (integer2 == -1) {
            integer2 = 1;
        }
        return new NetworkEnvironment(new NetworkBufferPool(i, integer), new NettyConnectionManager(new NettyConfig(LOCAL_ADDRESS, 0, integer, integer2, configuration)), new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null, IOManager.IOMode.SYNC, ((Integer) TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue()).intValue(), ((Integer) TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue()).intValue(), ((Integer) TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue(), ((Integer) TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()).intValue(), true);
    }

    protected ResultPartitionWriter createResultPartition(JobID jobID, ResultPartitionID resultPartitionID, NetworkEnvironment networkEnvironment, int i) throws Exception {
        ResultPartition resultPartition = new ResultPartition("sender task", new NoOpTaskActions(), jobID, resultPartitionID, ResultPartitionType.PIPELINED_BOUNDED, i, 1, networkEnvironment.getResultPartitionManager(), new NoOpResultPartitionConsumableNotifier(), this.ioManager, false);
        networkEnvironment.setupPartition(resultPartition);
        return resultPartition;
    }

    private InputGate createInputGate(JobID jobID, IntermediateDataSetID intermediateDataSetID, ExecutionAttemptID executionAttemptID, TaskManagerLocation taskManagerLocation, NetworkEnvironment networkEnvironment, int i) throws IOException {
        InputGate[] inputGateArr = new InputGate[i];
        for (int i2 = 0; i2 < i; i2++) {
            int i3 = i2;
            SingleInputGate create = SingleInputGate.create("receiving task[" + i2 + "]", jobID, executionAttemptID, new InputGateDeploymentDescriptor(intermediateDataSetID, ResultPartitionType.PIPELINED_BOUNDED, i2, (InputChannelDeploymentDescriptor[]) Arrays.stream(this.partitionIds).map(resultPartitionID -> {
                return new InputChannelDeploymentDescriptor(resultPartitionID, this.localMode ? ResultPartitionLocation.createLocal() : ResultPartitionLocation.createRemote(new ConnectionID(taskManagerLocation, i3)));
            }).toArray(i4 -> {
                return new InputChannelDeploymentDescriptor[i4];
            })), networkEnvironment, new NoOpTaskActions(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
            networkEnvironment.setupInputGate(create);
            inputGateArr[i2] = create;
        }
        return i > 1 ? new UnionInputGate(inputGateArr) : inputGateArr[0];
    }

    static {
        try {
            LOCAL_ADDRESS = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new Error(e);
        }
    }
}
