/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.benchmark;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.io.benchmark.SerializingLongReceiver;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MathUtils;

public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
    private static final InetAddress LOCAL_ADDRESS;
    protected final JobID jobId = new JobID();
    protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
    protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
    protected NetworkEnvironment senderEnv;
    protected NetworkEnvironment receiverEnv;
    protected IOManager ioManager;
    protected int channels;
    protected boolean broadcastMode = false;
    protected boolean localMode = false;
    protected ResultPartitionID[] partitionIds;

    public void setUp(int writers, int channels, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize) throws Exception {
        this.setUp(writers, channels, false, localMode, senderBufferPoolSize, receiverBufferPoolSize, new Configuration());
    }

    public void setUp(int writers, int channels, boolean broadcastMode, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception {
        this.broadcastMode = broadcastMode;
        this.localMode = localMode;
        this.channels = channels;
        this.partitionIds = new ResultPartitionID[writers];
        if (senderBufferPoolSize == -1) {
            senderBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        if (receiverBufferPoolSize == -1) {
            receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        this.ioManager = new IOManagerAsync();
        this.senderEnv = this.createNettyNetworkEnvironment(senderBufferPoolSize, config);
        this.senderEnv.start();
        if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
            this.receiverEnv = this.senderEnv;
        } else {
            this.receiverEnv = this.createNettyNetworkEnvironment(receiverBufferPoolSize, config);
            this.receiverEnv.start();
        }
        this.generatePartitionIds();
    }

    public void tearDown() {
        ExceptionUtils.suppressExceptions(() -> ((NetworkEnvironment)this.senderEnv).shutdown());
        ExceptionUtils.suppressExceptions(() -> ((NetworkEnvironment)this.receiverEnv).shutdown());
        ExceptionUtils.suppressExceptions(() -> ((IOManager)this.ioManager).shutdown());
    }

    public SerializingLongReceiver createReceiver() throws Exception {
        TaskManagerLocation senderLocation = new TaskManagerLocation(ResourceID.generate(), LOCAL_ADDRESS, this.senderEnv.getConnectionManager().getDataPort());
        InputGate receiverGate = this.createInputGate(this.jobId, this.dataSetID, this.executionAttemptID, senderLocation, this.receiverEnv, this.channels);
        SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, this.channels * this.partitionIds.length);
        receiver.start();
        return receiver;
    }

    public StreamRecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
        ResultPartitionWriter sender = this.createResultPartition(this.jobId, this.partitionIds[partitionIndex], this.senderEnv, this.channels);
        return new StreamRecordWriter(sender, (ChannelSelector)new RoundRobinChannelSelector(), flushTimeout);
    }

    private void generatePartitionIds() throws Exception {
        for (int writer = 0; writer < this.partitionIds.length; ++writer) {
            this.partitionIds[writer] = new ResultPartitionID();
        }
    }

    private NetworkEnvironment createNettyNetworkEnvironment(int bufferPoolSize, Configuration config) throws Exception {
        int segmentSize = MathUtils.checkedDownCast((long)MemorySize.parse((String)config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
        int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        if (slots == -1) {
            slots = 1;
        }
        NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
        NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
        return new NetworkEnvironment(bufferPool, (ConnectionManager)nettyConnectionManager, new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), null, null, IOManager.IOMode.SYNC, ((Integer)TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue()).intValue(), ((Integer)TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue()).intValue(), ((Integer)TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue(), ((Integer)TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()).intValue(), true);
    }

    protected ResultPartitionWriter createResultPartition(JobID jobId, ResultPartitionID partitionId, NetworkEnvironment environment, int channels) throws Exception {
        ResultPartition resultPartition = new ResultPartition("sender task", (TaskActions)new NoOpTaskActions(), jobId, partitionId, ResultPartitionType.PIPELINED_BOUNDED, channels, 1, environment.getResultPartitionManager(), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), this.ioManager, false);
        environment.setupPartition(resultPartition);
        return resultPartition;
    }

    private InputGate createInputGate(JobID jobId, IntermediateDataSetID dataSetID, ExecutionAttemptID executionAttemptID, TaskManagerLocation senderLocation, NetworkEnvironment environment, int channels) throws IOException {
        InputGate[] gates = new InputGate[channels];
        for (int channel = 0; channel < channels; ++channel) {
            int finalChannel = channel;
            InputChannelDeploymentDescriptor[] channelDescriptors = (InputChannelDeploymentDescriptor[])Arrays.stream(this.partitionIds).map(partitionId -> new InputChannelDeploymentDescriptor(partitionId, this.localMode ? ResultPartitionLocation.createLocal() : ResultPartitionLocation.createRemote((ConnectionID)new ConnectionID(senderLocation, finalChannel)))).toArray(InputChannelDeploymentDescriptor[]::new);
            InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(dataSetID, ResultPartitionType.PIPELINED_BOUNDED, channel, channelDescriptors);
            SingleInputGate gate = SingleInputGate.create((String)("receiving task[" + channel + "]"), (JobID)jobId, (ExecutionAttemptID)executionAttemptID, (InputGateDeploymentDescriptor)gateDescriptor, (NetworkEnvironment)environment, (TaskActions)new NoOpTaskActions(), (TaskIOMetricGroup)UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
            environment.setupInputGate(gate);
            gates[channel] = gate;
        }
        if (channels > 1) {
            return new UnionInputGate(gates);
        }
        return gates[0];
    }

    static {
        try {
            LOCAL_ADDRESS = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new Error(e);
        }
    }

    private static final class NoOpTaskActions
    implements TaskActions {
        private NoOpTaskActions() {
        }

        public void triggerPartitionProducerStateCheck(JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId) {
        }

        public void failExternally(Throwable cause) {
        }
    }
}

