/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.benchmark;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.io.benchmark.LongRecordWriterThread;
import org.apache.flink.streaming.runtime.io.benchmark.ReceiverThread;
import org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkBenchmarkEnvironment;
import org.apache.flink.types.LongValue;

public class StreamNetworkThroughputBenchmark {
    protected StreamNetworkBenchmarkEnvironment<LongValue> environment;
    protected ReceiverThread receiver;
    protected LongRecordWriterThread[] writerThreads;

    public void executeBenchmark(long records) throws Exception {
        this.executeBenchmark(records, Long.MAX_VALUE);
    }

    public void executeBenchmark(long records, long timeout) throws Exception {
        LongValue value = new LongValue();
        value.setValue(0L);
        long lastRecord = records / (long)this.writerThreads.length;
        CompletableFuture<?> recordsReceived = this.receiver.setExpectedRecord(lastRecord);
        for (LongRecordWriterThread writerThread : this.writerThreads) {
            writerThread.setRecordsToSend(lastRecord);
        }
        recordsReceived.get(timeout, TimeUnit.MILLISECONDS);
    }

    public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception {
        this.setUp(recordWriters, channels, flushTimeout, false);
    }

    public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
        this.setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
    }

    public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize) throws Exception {
        this.setUp(recordWriters, channels, flushTimeout, false, localMode, senderBufferPoolSize, receiverBufferPoolSize, new Configuration());
    }

    public void setUp(int recordWriters, int channels, int flushTimeout, boolean broadcastMode, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception {
        this.environment = new StreamNetworkBenchmarkEnvironment();
        this.environment.setUp(recordWriters, channels, broadcastMode, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
        this.writerThreads = new LongRecordWriterThread[recordWriters];
        for (int writer = 0; writer < recordWriters; ++writer) {
            this.writerThreads[writer] = new LongRecordWriterThread(this.environment.createRecordWriter(writer, flushTimeout), broadcastMode);
            this.writerThreads[writer].start();
        }
        this.receiver = this.environment.createReceiver();
    }

    public void tearDown() throws Exception {
        for (LongRecordWriterThread writerThread : this.writerThreads) {
            writerThread.shutdown();
            writerThread.sync(5000L);
        }
        this.environment.tearDown();
        this.receiver.shutdown();
    }
}

