/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.perf;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

public class SimpleBenchmark {
    private final String kafka;
    private final String zookeeper;
    private final File stateDir;
    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>(){

        public byte[] apply(byte[] value1, byte[] value2) {
            if (value1 == null && value2 == null) {
                return new byte[100];
            }
            if (value1 == null && value2 != null) {
                return value2;
            }
            if (value1 != null && value2 == null) {
                return value1;
            }
            byte[] tmp = new byte[value1.length + value2.length];
            System.arraycopy(value1, 0, tmp, 0, value1.length);
            System.arraycopy(value2, 0, tmp, value1.length, value2.length);
            return tmp;
        }
    };
    private static int numRecords;
    private static Integer endKey;
    private static final int KEY_SIZE = 8;
    private static final int VALUE_SIZE = 100;
    private static final int RECORD_SIZE = 108;
    private static final Serde<byte[]> BYTE_SERDE;
    private static final Serde<Integer> INTEGER_SERDE;

    public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
        this.stateDir = stateDir;
        this.kafka = kafka;
        this.zookeeper = zookeeper;
    }

    public static void main(String[] args) throws Exception {
        String kafka = args.length > 0 ? args[0] : "localhost:9092";
        String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
        String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark";
        numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 10000000;
        endKey = numRecords - 1;
        File stateDir = new File(stateDirStr);
        stateDir.mkdir();
        File rocksdbDir = new File(stateDir, "rocksdb-test");
        rocksdbDir.mkdir();
        System.out.println("SimpleBenchmark instance started");
        System.out.println("kafka=" + kafka);
        System.out.println("zookeeper=" + zookeeper);
        System.out.println("stateDir=" + stateDir);
        System.out.println("numRecords=" + numRecords);
        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
        benchmark.produce(SOURCE_TOPIC, 100, "simple-benchmark-produce", numRecords, true, numRecords, true);
        benchmark.consume(SOURCE_TOPIC);
        benchmark.processStream(SOURCE_TOPIC);
        benchmark.processStreamWithSink(SOURCE_TOPIC);
        benchmark.processStreamWithStateStore(SOURCE_TOPIC);
        benchmark.kStreamKTableJoin("joinSourceTopic1kStreamKTable", "joinSourceTopic2kStreamKTable");
        benchmark.kStreamKStreamJoin("joinSourceTopic1kStreamKStream", "joinSourceTopic2kStreamKStream");
        benchmark.kTableKTableJoin("joinSourceTopic1kTableKTable", "joinSourceTopic2kTableKTable");
    }

    private Properties setJoinProperties(String applicationId) {
        Properties props = new Properties();
        props.put("application.id", applicationId);
        props.put("state.dir", this.stateDir.toString());
        props.put("bootstrap.servers", this.kafka);
        props.put("zookeeper.connect", this.zookeeper);
        props.put("num.stream.threads", (Object)1);
        props.put("auto.offset.reset", "earliest");
        props.put("key.serde", Serdes.Integer().getClass());
        props.put("value.serde", Serdes.ByteArray().getClass());
        return props;
    }

    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
        CountDownLatch latch = new CountDownLatch(numRecords);
        System.out.println("Initializing kStreamTopic " + kStreamTopic);
        this.produce(kStreamTopic, 100, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false);
        System.out.println("Initializing kTableTopic " + kTableTopic);
        this.produce(kTableTopic, 100, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false);
        Properties props = this.setJoinProperties("simple-benchmark-kstream-ktable-join");
        KafkaStreams streams = this.createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
        this.runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch);
    }

    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
        CountDownLatch latch = new CountDownLatch(numRecords);
        System.out.println("Initializing kStreamTopic " + kStreamTopic1);
        this.produce(kStreamTopic1, 100, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false);
        System.out.println("Initializing kStreamTopic " + kStreamTopic2);
        this.produce(kStreamTopic2, 100, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false);
        Properties props = this.setJoinProperties("simple-benchmark-kstream-kstream-join");
        KafkaStreams streams = this.createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
        this.runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch);
    }

    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
        CountDownLatch latch = new CountDownLatch(numRecords);
        System.out.println("Initializing kTableTopic " + kTableTopic1);
        this.produce(kTableTopic1, 100, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false);
        System.out.println("Initializing kTableTopic " + kTableTopic2);
        this.produce(kTableTopic2, 100, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false);
        Properties props = this.setJoinProperties("simple-benchmark-ktable-ktable-join");
        KafkaStreams streams = this.createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
        this.runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch);
    }

    private void runJoinBenchmark(KafkaStreams streams, String nameOfBenchmark, CountDownLatch latch) {
        streams.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {}
        }
        long endTime = System.currentTimeMillis();
        System.out.println(nameOfBenchmark + this.megaBytePerSec(endTime - startTime, numRecords, 108));
        streams.close();
    }

    public void processStream(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        final KafkaStreams streams = this.createKafkaStreams(topic, this.stateDir, this.kafka, this.zookeeper, latch);
        Thread thread = new Thread(){

            @Override
            public void run() {
                streams.start();
            }
        };
        thread.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {
                Thread.interrupted();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Streams Performance [MB/sec read]: " + this.megaBytePerSec(endTime - startTime));
        streams.close();
        try {
            thread.join();
        }
        catch (Exception ex) {
            // empty catch block
        }
    }

    public void processStreamWithSink(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        final KafkaStreams streams = this.createKafkaStreamsWithSink(topic, this.stateDir, this.kafka, this.zookeeper, latch);
        Thread thread = new Thread(){

            @Override
            public void run() {
                streams.start();
            }
        };
        thread.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {
                Thread.interrupted();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Streams Performance [MB/sec read+write]: " + this.megaBytePerSec(endTime - startTime));
        streams.close();
        try {
            thread.join();
        }
        catch (Exception ex) {
            // empty catch block
        }
    }

    public void processStreamWithStateStore(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        final KafkaStreams streams = this.createKafkaStreamsWithStateStore(topic, this.stateDir, this.kafka, this.zookeeper, latch);
        Thread thread = new Thread(){

            @Override
            public void run() {
                streams.start();
            }
        };
        thread.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {
                Thread.interrupted();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Streams Performance [MB/sec read+store]: " + this.megaBytePerSec(endTime - startTime));
        streams.close();
        try {
            thread.join();
        }
        catch (Exception ex) {
            // empty catch block
        }
    }

    public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, int upperRange, boolean printStats) throws Exception {
        if (sequential && upperRange < numRecords) {
            throw new Exception("UpperRange must be >= numRecords");
        }
        Properties props = new Properties();
        props.put("client.id", clientId);
        props.put("bootstrap.servers", this.kafka);
        props.put("key.serializer", IntegerSerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        int key = 0;
        Random rand = new Random();
        KafkaProducer producer = new KafkaProducer(props);
        byte[] value = new byte[valueSizeBytes];
        long startTime = System.currentTimeMillis();
        key = sequential ? 0 : rand.nextInt(upperRange);
        for (int i = 0; i < numRecords; ++i) {
            producer.send(new ProducerRecord(topic, (Object)key, (Object)value));
            if (sequential) {
                ++key;
                continue;
            }
            key = rand.nextInt(upperRange);
        }
        producer.close();
        long endTime = System.currentTimeMillis();
        if (printStats) {
            System.out.println("Producer Performance [MB/sec write]: " + this.megaBytePerSec(endTime - startTime, numRecords, 8 + valueSizeBytes));
        }
    }

    /*
     * Unable to fully structure code
     */
    public void consume(String topic) {
        props = new Properties();
        props.put("client.id", "simple-benchmark-consumer");
        props.put("bootstrap.servers", this.kafka);
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("enable.auto.commit", "false");
        consumer = new KafkaConsumer(props);
        partitions = this.getAllPartitions(consumer, new String[]{topic});
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);
        key = null;
        startTime = System.currentTimeMillis();
        block0: while (true) {
            if ((records = consumer.poll(500L)).isEmpty()) {
                if (!SimpleBenchmark.endKey.equals(key)) continue;
                break;
            }
            i$ = records.iterator();
            while (true) {
                if (i$.hasNext()) ** break;
                continue block0;
                record = (ConsumerRecord)i$.next();
                recKey = (Integer)record.key();
                if (key != null && key >= recKey) continue;
                key = recKey;
            }
            break;
        }
        endTime = System.currentTimeMillis();
        consumer.close();
        System.out.println("Consumer Performance [MB/sec read]: " + this.megaBytePerSec(endTime - startTime));
    }

    private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
        Properties props = new Properties();
        props.put("application.id", "simple-benchmark-streams");
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("zookeeper.connect", zookeeper);
        props.put("num.stream.threads", (Object)1);
        props.put("auto.offset.reset", "earliest");
        KStreamBuilder builder = new KStreamBuilder();
        KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{topic});
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){

                    public void init(ProcessorContext context) {
                    }

                    public void process(Integer key, byte[] value) {
                        if (endKey.equals(key)) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
        Properties props = new Properties();
        props.put("application.id", "simple-benchmark-streams-with-sink");
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("zookeeper.connect", zookeeper);
        props.put("num.stream.threads", (Object)1);
        props.put("auto.offset.reset", "earliest");
        KStreamBuilder builder = new KStreamBuilder();
        KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{topic});
        source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){

                    public void init(ProcessorContext context) {
                    }

                    public void process(Integer key, byte[] value) {
                        if (endKey.equals(key)) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic, String kTableTopic, CountDownLatch latch) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream input1 = builder.stream(new String[]{kStreamTopic});
        KTable input2 = builder.table(kTableTopic, kTableTopic + "-store");
        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
        return new KafkaStreams((TopologyBuilder)builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1, String kTableTopic2, CountDownLatch latch) {
        KStreamBuilder builder = new KStreamBuilder();
        KTable input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
        KTable input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
        return new KafkaStreams((TopologyBuilder)builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1, String kStreamTopic2, CountDownLatch latch) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream input1 = builder.stream(new String[]{kStreamTopic1});
        KStream input2 = builder.stream(new String[]{kStreamTopic2});
        long timeDifferenceMs = 10000L;
        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of((long)10000L)).foreach(new CountDownAction(latch));
        return new KafkaStreams((TopologyBuilder)builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
        Properties props = new Properties();
        props.put("application.id", "simple-benchmark-streams-with-store");
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("zookeeper.connect", zookeeper);
        props.put("num.stream.threads", (Object)1);
        props.put("auto.offset.reset", "earliest");
        KStreamBuilder builder = new KStreamBuilder();
        builder.addStateStore(Stores.create((String)"store").withIntegerKeys().withByteArrayValues().persistent().build(), new String[0]);
        KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{topic});
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){
                    KeyValueStore<Integer, byte[]> store;

                    public void init(ProcessorContext context) {
                        this.store = (KeyValueStore)context.getStateStore("store");
                    }

                    public void process(Integer key, byte[] value) {
                        this.store.put((Object)key, (Object)value);
                        if (endKey.equals(key)) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"store"});
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    private double megaBytePerSec(long time) {
        return (double)(108 * numRecords / 1024 / 1024) / ((double)time / 1000.0);
    }

    private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
        return (double)(recordSizeBytes * numRecords / 1024 / 1024) / ((double)time / 1000.0);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    static {
        BYTE_SERDE = Serdes.ByteArray();
        INTEGER_SERDE = Serdes.Integer();
    }

    private class CountDownAction<K, V>
    implements ForeachAction<K, V> {
        private CountDownLatch latch;

        CountDownAction(CountDownLatch latch) {
            this.latch = latch;
        }

        public void apply(K key, V value) {
            this.latch.countDown();
        }
    }
}

