/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.utils.Exit$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import scala.Console$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.StringOps$;
import scala.collection.convert.AsJavaExtensions;
import scala.collection.convert.AsScalaExtensions;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.ScalaRunTime$;
import scala.util.Random;

public final class EndToEndLatency$ {
    public static final EndToEndLatency$ MODULE$ = new EndToEndLatency$();
    private static final long timeout = 60000L;
    private static final short defaultReplicationFactor = 1;
    private static final int defaultNumPartitions = 1;

    private long timeout() {
        return timeout;
    }

    private short defaultReplicationFactor() {
        return defaultReplicationFactor;
    }

    private int defaultNumPartitions() {
        return defaultNumPartitions;
    }

    public void main(String[] args) {
        None$ propsFile;
        Object object;
        if (args.length != 5 && args.length != 6) {
            System.err.println(new StringBuilder(103).append("USAGE: java ").append(this.getClass().getName()).append(" broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file").toString());
            throw Exit$.MODULE$.exit(1, (Option<String>)None$.MODULE$);
        }
        String brokerList = args[0];
        String topic = args[1];
        String toInt$extension_$this = args[2];
        Object var26_4 = null;
        int numMessages = Integer.parseInt(toInt$extension_$this);
        String producerAcks = args[3];
        String toInt$extension_$this2 = args[4];
        Object var27_7 = null;
        int messageLen = Integer.parseInt(toInt$extension_$this2);
        if (args.length > 5) {
            Some filter_this = new Some((Object)args[5]);
            object = filter_this.isEmpty() || EndToEndLatency$.$anonfun$main$1((String)filter_this.value()) ? filter_this : None$.MODULE$;
            Object var22_9 = null;
        } else {
            object = propsFile = None$.MODULE$;
        }
        if (!new .colon.colon((Object)"1", (List)new .colon.colon((Object)"all", (List)Nil$.MODULE$)).contains((Object)producerAcks)) {
            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
        }
        Properties consumerProps = EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList);
        consumerProps.put("group.id", new StringBuilder(11).append("test-group-").append(System.currentTimeMillis()).toString());
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "latest");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("fetch.max.wait.ms", "0");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        Properties producerProps = EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList);
        producerProps.put("linger.ms", "0");
        producerProps.put("max.block.ms", Long.toString(Long.MAX_VALUE));
        producerProps.put("acks", producerAcks.toString());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(producerProps);
        if (!consumer.listTopics().containsKey(topic)) {
            try {
                this.createTopic(topic, EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList));
            }
            catch (Throwable t) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringBuilder(23).append("Failed to create topic ").append(topic).toString(), t);
            }
        }
        java.util.List topicPartitions = AsJavaExtensions.BufferHasAsJava$((AsJavaExtensions)CollectionConverters$.MODULE$, (Buffer)((Buffer)AsScalaExtensions.ListHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (java.util.List)consumer.partitionsFor(topic)).asScala().map((Function1 & Serializable)p -> new TopicPartition(p.topic(), p.partition())))).asJava();
        consumer.assign((Collection)topicPartitions);
        consumer.seekToEnd((Collection)topicPartitions);
        consumer.assignment().forEach(x$2 -> consumer.position(x$2));
        double d = 0.0;
        long[] latencies = new long[numMessages];
        Random random = new Random(0);
        int n = 0;
        Range.Exclusive foreach$mVc$sp_this = new Range.Exclusive(n, numMessages, 1);
        if (!foreach$mVc$sp_this.isEmpty()) {
            int foreach$mVc$sp_i = foreach$mVc$sp_this.start();
            while (true) {
                Object var39_29;
                byte[] $anonfun$main$6_message = MODULE$.randomBytesOfLen(random, messageLen);
                long $anonfun$main$6_begin = System.nanoTime();
                producer.send(new ProducerRecord(topic, (Object)$anonfun$main$6_message)).get();
                Iterator $anonfun$main$6_recordIter = consumer.poll(Duration.ofMillis(MODULE$.timeout())).iterator();
                long $anonfun$main$6_elapsed = System.nanoTime() - $anonfun$main$6_begin;
                if (!$anonfun$main$6_recordIter.hasNext()) {
                    EndToEndLatency$.finalise$1(consumer, producer);
                    throw new RuntimeException(new StringBuilder(53).append("poll() timed out before finding a result (timeout:[").append(MODULE$.timeout()).append("])").toString());
                }
                String $anonfun$main$6_sent = new String($anonfun$main$6_message, StandardCharsets.UTF_8);
                String $anonfun$main$6_read = new String((byte[])((ConsumerRecord)$anonfun$main$6_recordIter.next()).value(), StandardCharsets.UTF_8);
                if (!$anonfun$main$6_read.equals($anonfun$main$6_sent)) {
                    EndToEndLatency$.finalise$1(consumer, producer);
                    throw new RuntimeException(new StringBuilder(53).append("The message read [").append($anonfun$main$6_read).append("] did not match the message sent [").append($anonfun$main$6_sent).append("]").toString());
                }
                if ($anonfun$main$6_recordIter.hasNext()) {
                    int $anonfun$main$6_count = 1 + AsScalaExtensions.IteratorHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (Iterator)$anonfun$main$6_recordIter).asScala().size();
                    throw new RuntimeException(new StringBuilder(58).append("Only one result was expected during this test. We found [").append($anonfun$main$6_count).append("]").toString());
                }
                if (foreach$mVc$sp_i % 1000 == 0) {
                    String $anonfun$main$6_println_x = new StringBuilder(1).append(Integer.toString(foreach$mVc$sp_i)).append("\t").append((double)$anonfun$main$6_elapsed / 1000.0 / 1000.0).toString();
                    Console$.MODULE$.println((Object)$anonfun$main$6_println_x);
                    var39_29 = null;
                }
                d += (double)$anonfun$main$6_elapsed;
                latencies[foreach$mVc$sp_i] = $anonfun$main$6_elapsed / 1000L / 1000L;
                Object var30_23 = null;
                Object var33_25 = null;
                Object var36_27 = null;
                Object var37_28 = null;
                var39_29 = null;
                if (foreach$mVc$sp_i == foreach$mVc$sp_this.scala$collection$immutable$Range$$lastElement) break;
                foreach$mVc$sp_i += foreach$mVc$sp_this.step();
            }
        }
        Object var24_21 = null;
        String println_x = StringOps$.MODULE$.format$extension("Avg latency: %.4f ms\n", (Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{d / (double)numMessages / 1000.0 / 1000.0}));
        Console$.MODULE$.println((Object)println_x);
        Object var28_31 = null;
        Arrays.sort(latencies);
        long p50 = latencies[(int)((double)latencies.length * 0.5)];
        long p99 = latencies[(int)((double)latencies.length * 0.99)];
        long p999 = latencies[(int)((double)latencies.length * 0.999)];
        String println_x2 = StringOps$.MODULE$.format$extension("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", (Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{p50, p99, p999}));
        Console$.MODULE$.println((Object)println_x2);
        Object var29_35 = null;
        EndToEndLatency$.finalise$1(consumer, producer);
    }

    /*
     * WARNING - void declaration
     */
    public byte[] randomBytesOfLen(Random random, int len) {
        void var3_3;
        if (len <= 0) {
            return new byte[0];
        }
        byte[] fill_array = new byte[len];
        for (int fill_i = 0; fill_i < len; ++fill_i) {
            void var5_5;
            byte boxToByte_b = (byte)(random.nextInt(26) + 65);
            fill_array[fill_i] = var5_5;
        }
        return var3_3;
    }

    public void createTopic(String topic, Properties props) {
        String println_x = StringOps$.MODULE$.format$extension("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d", (Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{topic, this.defaultNumPartitions(), this.defaultReplicationFactor()}));
        Console$.MODULE$.println((Object)println_x);
        Object var6_3 = null;
        Admin adminClient = Admin.create((Properties)props);
        NewTopic newTopic = new NewTopic(topic, this.defaultNumPartitions(), this.defaultReplicationFactor());
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
        }
        finally {
            Utils.closeQuietly((AutoCloseable)adminClient, (String)"AdminClient");
        }
    }

    public static final /* synthetic */ boolean $anonfun$main$1(String x$1) {
        return !x$1.isEmpty();
    }

    public static final /* synthetic */ Properties $anonfun$main$2(String x$1) {
        return Utils.loadProps((String)x$1);
    }

    public static final /* synthetic */ Properties $anonfun$main$3() {
        return new Properties();
    }

    private static final Properties loadPropsWithBootstrapServers$1(Option propsFile$1, String brokerList$1) {
        None$ getOrElse_this = propsFile$1.isEmpty() ? None$.MODULE$ : new Some((Object)Utils.loadProps((String)((String)propsFile$1.get()), null));
        Object var3_2 = null;
        Properties props = (Properties)(getOrElse_this.isEmpty() ? new Properties() : getOrElse_this.get());
        props.put("bootstrap.servers", brokerList$1);
        return props;
    }

    private static final void finalise$1(KafkaConsumer consumer$1, KafkaProducer producer$1) {
        consumer$1.commitSync();
        producer$1.close();
        consumer$1.close();
    }

    public static final /* synthetic */ void $anonfun$main$6(Random random$1, int messageLen$1, KafkaProducer producer$1, String topic$1, KafkaConsumer consumer$1, DoubleRef totalTime$1, long[] latencies$1, int i) {
        byte[] message = MODULE$.randomBytesOfLen(random$1, messageLen$1);
        long begin = System.nanoTime();
        producer$1.send(new ProducerRecord(topic$1, (Object)message)).get();
        Iterator recordIter = consumer$1.poll(Duration.ofMillis(MODULE$.timeout())).iterator();
        long elapsed = System.nanoTime() - begin;
        if (!recordIter.hasNext()) {
            EndToEndLatency$.finalise$1(consumer$1, producer$1);
            throw new RuntimeException(new StringBuilder(53).append("poll() timed out before finding a result (timeout:[").append(MODULE$.timeout()).append("])").toString());
        }
        String sent = new String(message, StandardCharsets.UTF_8);
        String read = new String((byte[])((ConsumerRecord)recordIter.next()).value(), StandardCharsets.UTF_8);
        if (!read.equals(sent)) {
            EndToEndLatency$.finalise$1(consumer$1, producer$1);
            throw new RuntimeException(new StringBuilder(53).append("The message read [").append(read).append("] did not match the message sent [").append(sent).append("]").toString());
        }
        if (recordIter.hasNext()) {
            int count = 1 + AsScalaExtensions.IteratorHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (Iterator)recordIter).asScala().size();
            throw new RuntimeException(new StringBuilder(58).append("Only one result was expected during this test. We found [").append(count).append("]").toString());
        }
        if (i % 1000 == 0) {
            String println_x = new StringBuilder(1).append(Integer.toString(i)).append("\t").append((double)elapsed / 1000.0 / 1000.0).toString();
            Console$.MODULE$.println((Object)println_x);
        }
        totalTime$1.elem += (double)elapsed;
        latencies$1[i] = elapsed / 1000L / 1000L;
    }

    public static final /* synthetic */ byte $anonfun$randomBytesOfLen$1(Random random$2) {
        return (byte)(random$2.nextInt(26) + 65);
    }

    private EndToEndLatency$() {
    }

    public static final /* synthetic */ Object $anonfun$main$1$adapted(String x$1) {
        return BoxesRunTime.boxToBoolean((boolean)EndToEndLatency$.$anonfun$main$1(x$1));
    }
}

