/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.After;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore
public class NetworkStackThroughputITCase {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
    private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
    private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
    private static final String PARALLELISM_CONFIG_KEY = "num.subtasks";
    private static final String NUM_SLOTS_PER_TM_CONFIG_KEY = "num.slots.per.tm";
    private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
    private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
    private static final int IS_SLOW_SLEEP_MS = 10;
    private static final int IS_SLOW_EVERY_NUM_RECORDS = 512;

    public void testThroughput() throws Exception {
        Object[][] configParams;
        for (Object[] p : configParams = new Object[][]{{1, false, false, false, 4, 2}, {1, true, false, false, 4, 2}, {1, true, true, false, 4, 2}, {1, true, false, true, 4, 2}, {2, true, false, false, 4, 2}, {4, true, false, false, 4, 2}, {4, true, false, false, 8, 4}}) {
            Configuration config = new Configuration();
            config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, ((Integer)p[0]).intValue());
            config.setBoolean(USE_FORWARDER_CONFIG_KEY, ((Boolean)p[1]).booleanValue());
            config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, ((Boolean)p[2]).booleanValue());
            config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, ((Boolean)p[3]).booleanValue());
            config.setInteger(PARALLELISM_CONFIG_KEY, ((Integer)p[4]).intValue());
            config.setInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, ((Integer)p[5]).intValue());
            TestBaseWrapper test = new TestBaseWrapper(config);
            System.out.println(Arrays.toString(p));
            test.testJob();
            test.calculateThroughput();
        }
    }

    private void runAllTests() throws Exception {
        this.testThroughput();
        System.out.println("Done.");
    }

    public static void main(String[] args) throws Exception {
        new NetworkStackThroughputITCase().runAllTests();
    }

    public static class SpeedTestRecord
    implements IOReadableWritable {
        private static final int RECORD_SIZE = 128;
        private final byte[] buf = new byte[128];

        public SpeedTestRecord() {
            for (int i = 0; i < 128; ++i) {
                this.buf[i] = (byte)(i % 128);
            }
        }

        public void write(DataOutputView out) throws IOException {
            out.write(this.buf);
        }

        public void read(DataInputView in) throws IOException {
            in.readFully(this.buf);
        }
    }

    public static class SpeedTestConsumer
    extends AbstractInvokable {
        private RecordReader<SpeedTestRecord> reader;

        public void registerInputOutput() {
            this.reader = new RecordReader(this.getEnvironment().getInputGate(0), SpeedTestRecord.class);
        }

        public void invoke() throws Exception {
            boolean isSlow = this.getTaskConfiguration().getBoolean(NetworkStackThroughputITCase.IS_SLOW_RECEIVER_CONFIG_KEY, false);
            int numRecords = 0;
            while (this.reader.next() != null) {
                if (!isSlow || numRecords++ % 512 != 0) continue;
                Thread.sleep(10L);
            }
            this.reader.clearBuffers();
        }
    }

    public static class SpeedTestForwarder
    extends AbstractInvokable {
        private RecordReader<SpeedTestRecord> reader;
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.reader = new RecordReader(this.getEnvironment().getInputGate(0), SpeedTestRecord.class);
            this.writer = new RecordWriter(this.getEnvironment().getWriter(0));
        }

        public void invoke() throws Exception {
            SpeedTestRecord record;
            while ((record = (SpeedTestRecord)this.reader.next()) != null) {
                this.writer.emit((IOReadableWritable)record);
            }
            this.reader.clearBuffers();
            this.writer.flush();
        }
    }

    public static class SpeedTestProducer
    extends AbstractInvokable {
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.writer = new RecordWriter(this.getEnvironment().getWriter(0));
        }

        public void invoke() throws Exception {
            int dataVolumeGb = this.getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
            long dataMbPerSubtask = dataVolumeGb * 1024 / this.getCurrentNumberOfSubtasks();
            long numRecordsToEmit = dataMbPerSubtask * 1024L * 1024L / 128L;
            LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)", this.getIndexInSubtaskGroup() + 1, this.getCurrentNumberOfSubtasks(), numRecordsToEmit, 128, (double)dataMbPerSubtask / 1024.0));
            boolean isSlow = this.getTaskConfiguration().getBoolean(NetworkStackThroughputITCase.IS_SLOW_SENDER_CONFIG_KEY, false);
            int numRecords = 0;
            SpeedTestRecord record = new SpeedTestRecord();
            for (long i = 0L; i < numRecordsToEmit; ++i) {
                if (isSlow && numRecords++ % 512 == 0) {
                    Thread.sleep(10L);
                }
                this.writer.emit((IOReadableWritable)record);
            }
            this.writer.flush();
        }
    }

    private static class TestBaseWrapper
    extends RecordAPITestBase {
        private int dataVolumeGb;
        private boolean useForwarder;
        private boolean isSlowSender;
        private boolean isSlowReceiver;
        private int parallelism;

        public TestBaseWrapper(Configuration config) {
            super(config);
            this.dataVolumeGb = config.getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
            this.useForwarder = config.getBoolean(NetworkStackThroughputITCase.USE_FORWARDER_CONFIG_KEY, true);
            this.isSlowSender = config.getBoolean(NetworkStackThroughputITCase.IS_SLOW_SENDER_CONFIG_KEY, false);
            this.isSlowReceiver = config.getBoolean(NetworkStackThroughputITCase.IS_SLOW_RECEIVER_CONFIG_KEY, false);
            this.parallelism = config.getInteger(NetworkStackThroughputITCase.PARALLELISM_CONFIG_KEY, 1);
            int numSlots = config.getInteger(NetworkStackThroughputITCase.NUM_SLOTS_PER_TM_CONFIG_KEY, 1);
            if (this.parallelism % numSlots != 0) {
                throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
            }
            this.setNumTaskManagers(this.parallelism / numSlots);
            this.setTaskManagerNumSlots(numSlots);
        }

        protected JobGraph getJobGraph() throws Exception {
            return this.createJobGraph(this.dataVolumeGb, this.useForwarder, this.isSlowSender, this.isSlowReceiver, this.parallelism);
        }

        private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int numSubtasks) {
            JobGraph jobGraph = new JobGraph("Speed Test");
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            JobVertex producer = new JobVertex("Speed Test Producer");
            jobGraph.addVertex(producer);
            producer.setSlotSharingGroup(sharingGroup);
            producer.setInvokableClass(SpeedTestProducer.class);
            producer.setParallelism(numSubtasks);
            producer.getConfiguration().setInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
            producer.getConfiguration().setBoolean(NetworkStackThroughputITCase.IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
            JobVertex forwarder = null;
            if (useForwarder) {
                forwarder = new JobVertex("Speed Test Forwarder");
                jobGraph.addVertex(forwarder);
                forwarder.setSlotSharingGroup(sharingGroup);
                forwarder.setInvokableClass(SpeedTestForwarder.class);
                forwarder.setParallelism(numSubtasks);
            }
            JobVertex consumer = new JobVertex("Speed Test Consumer");
            jobGraph.addVertex(consumer);
            consumer.setSlotSharingGroup(sharingGroup);
            consumer.setInvokableClass(SpeedTestConsumer.class);
            consumer.setParallelism(numSubtasks);
            consumer.getConfiguration().setBoolean(NetworkStackThroughputITCase.IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
            if (useForwarder) {
                forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
                consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL);
            } else {
                consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
            }
            return jobGraph;
        }

        @After
        public void calculateThroughput() {
            if (this.getJobExecutionResult() != null) {
                int dataVolumeGb = this.config.getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
                long dataVolumeMbit = dataVolumeGb * 8192;
                long runtimeSecs = this.getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
                int mbitPerSecond = (int)((double)dataVolumeMbit / (double)runtimeSecs);
                LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
            }
        }
    }
}

