/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.benchmark;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.InputGateWithMetrics;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.streaming.runtime.io.benchmark.SerializingLongReceiver;
import org.apache.flink.util.ExceptionUtils;

public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
    private static final InetAddress LOCAL_ADDRESS;
    private final ResourceID location = ResourceID.generate();
    protected final JobID jobId = new JobID();
    protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
    protected NettyShuffleEnvironment senderEnv;
    protected NettyShuffleEnvironment receiverEnv;
    protected int channels;
    protected boolean broadcastMode = false;
    protected boolean localMode = false;
    protected ResultPartitionID[] partitionIds;
    private int dataPort;
    private SingleInputGateFactory gateFactory;

    public void setUp(int writers, int channels, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize) throws Exception {
        this.setUp(writers, channels, false, localMode, senderBufferPoolSize, receiverBufferPoolSize, new Configuration());
    }

    public void setUp(int writers, int channels, boolean broadcastMode, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception {
        this.broadcastMode = broadcastMode;
        this.localMode = localMode;
        this.channels = channels;
        this.partitionIds = new ResultPartitionID[writers];
        if (senderBufferPoolSize == -1) {
            senderBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        if (receiverBufferPoolSize == -1) {
            receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        this.senderEnv = this.createShuffleEnvironment(senderBufferPoolSize, config);
        this.dataPort = this.senderEnv.start();
        if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
            this.receiverEnv = this.senderEnv;
        } else {
            this.receiverEnv = this.createShuffleEnvironment(receiverBufferPoolSize, config);
            this.receiverEnv.start();
        }
        this.gateFactory = new SingleInputGateFactory(this.location, this.receiverEnv.getConfiguration(), this.receiverEnv.getConnectionManager(), this.receiverEnv.getResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), this.receiverEnv.getNetworkBufferPool());
        this.generatePartitionIds();
    }

    public void tearDown() {
        ExceptionUtils.suppressExceptions(() -> ((NettyShuffleEnvironment)this.senderEnv).close());
        ExceptionUtils.suppressExceptions(() -> ((NettyShuffleEnvironment)this.receiverEnv).close());
    }

    public SerializingLongReceiver createReceiver() throws Exception {
        TaskManagerLocation senderLocation = new TaskManagerLocation(ResourceID.generate(), LOCAL_ADDRESS, this.dataPort);
        InputGate receiverGate = this.createInputGate(senderLocation);
        SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, this.channels * this.partitionIds.length);
        receiver.start();
        return receiver;
    }

    public RecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
        ResultPartitionWriter sender = this.createResultPartition(this.jobId, this.partitionIds[partitionIndex], this.senderEnv, this.channels);
        return new RecordWriterBuilder().setTimeout(flushTimeout).build(sender);
    }

    private void generatePartitionIds() throws Exception {
        for (int writer = 0; writer < this.partitionIds.length; ++writer) {
            this.partitionIds[writer] = new ResultPartitionID();
        }
    }

    private NettyShuffleEnvironment createShuffleEnvironment(int bufferPoolSize, Configuration config) throws Exception {
        NettyConfig nettyConfig = new NettyConfig(LOCAL_ADDRESS, 0, ConfigurationParserUtils.getPageSize((Configuration)config), ConfigurationParserUtils.getSlot((Configuration)config), config);
        return new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(bufferPoolSize).setNettyConfig(nettyConfig).build();
    }

    protected ResultPartitionWriter createResultPartition(JobID jobId, ResultPartitionID partitionId, NettyShuffleEnvironment environment, int channels) throws Exception {
        ResultPartition resultPartitionWriter = new ResultPartitionBuilder().setResultPartitionId(partitionId).setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).setNumberOfSubpartitions(channels).setResultPartitionManager(environment.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(environment).build();
        ConsumableNotifyingResultPartitionWriterDecorator consumableNotifyingPartitionWriter = new ConsumableNotifyingResultPartitionWriterDecorator((TaskActions)new NoOpTaskActions(), jobId, (ResultPartitionWriter)resultPartitionWriter, (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier());
        consumableNotifyingPartitionWriter.setup();
        return consumableNotifyingPartitionWriter;
    }

    private InputGate createInputGate(TaskManagerLocation senderLocation) throws Exception {
        InputGate[] gates = new InputGate[this.channels];
        for (int channel = 0; channel < this.channels; ++channel) {
            InputGateDeploymentDescriptor gateDescriptor = this.createInputGateDeploymentDescriptor(senderLocation, channel, this.location);
            InputGate gate = this.createInputGateWithMetrics(this.gateFactory, gateDescriptor, channel);
            gate.setup();
            gates[channel] = gate;
        }
        if (this.channels > 1) {
            return new UnionInputGate(gates);
        }
        return gates[0];
    }

    private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor(TaskManagerLocation senderLocation, int consumedSubpartitionIndex, ResourceID localLocation) {
        ShuffleDescriptor[] channelDescriptors = (ShuffleDescriptor[])Arrays.stream(this.partitionIds).map(partitionId -> StreamNetworkBenchmarkEnvironment.createShuffleDescriptor(this.localMode, partitionId, localLocation, senderLocation, consumedSubpartitionIndex)).toArray(ShuffleDescriptor[]::new);
        return new InputGateDeploymentDescriptor(this.dataSetID, ResultPartitionType.PIPELINED_BOUNDED, consumedSubpartitionIndex, channelDescriptors);
    }

    private InputGate createInputGateWithMetrics(SingleInputGateFactory gateFactory, InputGateDeploymentDescriptor gateDescriptor, int channelIndex) {
        SingleInputGate singleGate = gateFactory.create("receiving task[" + channelIndex + "]", gateDescriptor, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics());
        return new InputGateWithMetrics((InputGate)singleGate, (Counter)new SimpleCounter());
    }

    private static ShuffleDescriptor createShuffleDescriptor(boolean localMode, ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, int channel) {
        NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder().setId(resultPartitionID).setProducerInfoFromTaskManagerLocation(senderLocation).setConnectionIndex(channel);
        return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
    }

    static {
        try {
            LOCAL_ADDRESS = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new Error(e);
        }
    }
}

