/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.perf;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.perf.YahooBenchmark;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;

public class SimpleBenchmark {
    final String kafka;
    private final File stateDir;
    final Boolean loadPhase;
    final String testName;
    final int numThreads;
    static final String ALL_TESTS = "all";
    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final String COUNT_TOPIC = "countTopic";
    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
    private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
    private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>(){

        public byte[] apply(byte[] value1, byte[] value2) {
            if (value1 == null && value2 == null) {
                return new byte[100];
            }
            if (value1 == null && value2 != null) {
                return value2;
            }
            if (value1 != null && value2 == null) {
                return value1;
            }
            byte[] tmp = new byte[value1.length + value2.length];
            System.arraycopy(value1, 0, tmp, 0, value1.length);
            System.arraycopy(value2, 0, tmp, value1.length, value2.length);
            return tmp;
        }
    };
    int numRecords;
    final AtomicInteger processedRecords = new AtomicInteger(0);
    long processedBytes = 0L;
    private static final int VALUE_SIZE = 100;
    private static final long POLL_MS = 500L;
    private static final long COMMIT_INTERVAL_MS = 30000L;
    private static final int MAX_POLL_RECORDS = 1000;
    private static final int SOCKET_SIZE_BYTES = 0x100000;
    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();

    public SimpleBenchmark(File stateDir, String kafka, Boolean loadPhase, String testName, int numRecords, int numThreads) {
        this.stateDir = stateDir;
        this.kafka = kafka;
        this.loadPhase = loadPhase;
        this.testName = testName;
        this.numRecords = numRecords;
        this.numThreads = numThreads;
    }

    private void run() {
        switch (this.testName) {
            case "all": {
                this.produce(SOURCE_TOPIC);
                this.consume(SOURCE_TOPIC);
                this.processStream(SOURCE_TOPIC);
                this.processStreamWithSink(SOURCE_TOPIC);
                this.processStreamWithStateStore(SOURCE_TOPIC);
                this.processStreamWithCachedStateStore(SOURCE_TOPIC);
                this.count(COUNT_TOPIC);
                this.kStreamKTableJoin("joinSourceTopic1KStreamKTable", "joinSourceTopic2KStreamKTable");
                this.kStreamKStreamJoin("joinSourceTopic1KStreamKStream", "joinSourceTopic2KStreamKStream");
                this.kTableKTableJoin("joinSourceTopic1KTableKTable", "joinSourceTopic2KTableKTable");
                break;
            }
            case "produce": {
                this.produce(SOURCE_TOPIC);
                break;
            }
            case "consume": {
                this.consume(SOURCE_TOPIC);
                break;
            }
            case "count": {
                this.count(COUNT_TOPIC);
                break;
            }
            case "processstream": {
                this.processStream(SOURCE_TOPIC);
                break;
            }
            case "processstreamwithsink": {
                this.processStreamWithSink(SOURCE_TOPIC);
                break;
            }
            case "processstreamwithstatestore": {
                this.processStreamWithStateStore(SOURCE_TOPIC);
                break;
            }
            case "processstreamwithcachedstatestore": {
                this.processStreamWithCachedStateStore(SOURCE_TOPIC);
                break;
            }
            case "kstreamktablejoin": {
                this.kStreamKTableJoin("joinSourceTopic1KStreamKTable", "joinSourceTopic2KStreamKTable");
                break;
            }
            case "kstreamkstreamjoin": {
                this.kStreamKStreamJoin("joinSourceTopic1KStreamKStream", "joinSourceTopic2KStreamKStream");
                break;
            }
            case "ktablektablejoin": {
                this.kTableKTableJoin("joinSourceTopic1KTableKTable", "joinSourceTopic2KTableKTable");
                break;
            }
            case "yahoo": {
                this.yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
                break;
            }
            default: {
                throw new RuntimeException("Unknown test name " + this.testName);
            }
        }
    }

    public static void main(String[] args) {
        String kafka = args.length > 0 ? args[0] : "localhost:9092";
        String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
        int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
        boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
        String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
        int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
        File stateDir = new File(stateDirStr);
        stateDir.mkdir();
        System.out.println("StreamsTest instance started");
        System.out.println("kafka=" + kafka);
        System.out.println("stateDir=" + stateDir);
        System.out.println("numRecords=" + numRecords);
        System.out.println("loadPhase=" + loadPhase);
        System.out.println("testName=" + testName);
        System.out.println("numThreads=" + numThreads);
        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName, numRecords, numThreads);
        benchmark.run();
    }

    public Properties setStreamProperties(String applicationId) {
        Properties props = new Properties();
        props.put("application.id", applicationId);
        props.put("state.dir", this.stateDir.toString());
        props.put("bootstrap.servers", this.kafka);
        props.put("num.stream.threads", (Object)this.numThreads);
        props.put("auto.offset.reset", "earliest");
        props.put("receive.buffer.bytes", (Object)0x100000);
        props.put("default.key.serde", Serdes.Integer().getClass());
        props.put("default.value.serde", Serdes.ByteArray().getClass());
        props.put("max.poll.records", (Object)1000);
        props.put("poll.ms", (Object)500L);
        props.put("commit.interval.ms", (Object)30000L);
        props.put(StreamsConfig.producerPrefix((String)"request.timeout.ms"), (Object)60000);
        return props;
    }

    private Properties setProduceConsumeProperties(String clientId) {
        Properties props = new Properties();
        props.put("client.id", clientId);
        props.put("bootstrap.servers", this.kafka);
        props.put("key.serializer", IntegerSerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.put("send.buffer.bytes", (Object)0x100000);
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("enable.auto.commit", "false");
        props.put("receive.buffer.bytes", (Object)0x100000);
        props.put("max.poll.records", (Object)1000);
        return props;
    }

    private boolean maybeSetupPhase(String topic, String clientId, boolean skipIfAllTests) {
        this.resetStats();
        if (this.loadPhase.booleanValue()) {
            if (skipIfAllTests && this.testName.equals(ALL_TESTS)) {
                return true;
            }
            System.out.println("Initializing topic " + topic);
            this.produce(topic, 100, clientId, this.numRecords, true, this.numRecords, false);
            return true;
        }
        return false;
    }

    void resetStats() {
        this.processedRecords.set(0);
        this.processedBytes = 0L;
    }

    private KafkaStreams createCountStreams(Properties streamConfig, String topic, CountDownLatch latch) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(topic);
        input.groupByKey().count("tmpStoreName").foreach(new CountDownAction(latch));
        return new KafkaStreams(builder.build(), streamConfig);
    }

    private void yahooBenchmark(String campaignsTopic, String eventsTopic) {
        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
        benchmark.run();
    }

    public void count(String countTopic) {
        if (this.maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        Properties props = this.setStreamProperties("simple-benchmark-count");
        KafkaStreams streams = this.createCountStreams(props, countTopic, latch);
        this.runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
    }

    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) {
        if (this.maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
            this.maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        Properties props = this.setStreamProperties("simple-benchmark-kstream-ktable-join");
        KafkaStreams streams = this.createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
        this.runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) {
        if (this.maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
            this.maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        Properties props = this.setStreamProperties("simple-benchmark-kstream-kstream-join");
        KafkaStreams streams = this.createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
        this.runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
    }

    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) {
        if (this.maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
            this.maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        Properties props = this.setStreamProperties("simple-benchmark-ktable-ktable-join");
        KafkaStreams streams = this.createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
        this.runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    void printResults(String nameOfBenchmark, long latency) {
        System.out.println(nameOfBenchmark + this.processedRecords.get() + "/" + latency + "/" + this.recordsPerSec(latency, this.processedRecords.get()) + "/" + this.megabytesPerSec(latency, this.processedBytes));
    }

    void runGenericBenchmark(KafkaStreams streams, String nameOfBenchmark, CountDownLatch latch) {
        streams.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
        long endTime = System.currentTimeMillis();
        this.printResults(nameOfBenchmark, endTime - startTime);
        streams.close();
    }

    private long startStreamsThread(final KafkaStreams streams, CountDownLatch latch) {
        Thread thread = new Thread(){

            @Override
            public void run() {
                streams.start();
            }
        };
        thread.start();
        long startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L) {
            try {
                latch.await();
            }
            catch (InterruptedException ex) {
                Thread.interrupted();
            }
        }
        long endTime = System.currentTimeMillis();
        streams.close();
        try {
            thread.join();
        }
        catch (Exception exception) {
            // empty catch block
        }
        return endTime - startTime;
    }

    public void processStream(String topic) {
        if (this.maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        KafkaStreams streams = this.createKafkaStreams(topic, latch);
        long latency = this.startStreamsThread(streams, latch);
        this.printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
    }

    public void processStreamWithSink(String topic) {
        if (this.maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        KafkaStreams streams = this.createKafkaStreamsWithSink(topic, latch);
        long latency = this.startStreamsThread(streams, latch);
        this.printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency);
    }

    public void processStreamWithStateStore(String topic) {
        if (this.maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        KafkaStreams streams = this.createKafkaStreamsWithStateStore(topic, latch, false);
        long latency = this.startStreamsThread(streams, latch);
        this.printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency);
    }

    public void processStreamWithCachedStateStore(String topic) {
        if (this.maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
            return;
        }
        CountDownLatch latch = new CountDownLatch(1);
        KafkaStreams streams = this.createKafkaStreamsWithStateStore(topic, latch, true);
        long latency = this.startStreamsThread(streams, latch);
        this.printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
    }

    public void produce(String topic) {
        if (this.loadPhase.booleanValue()) {
            this.resetStats();
            return;
        }
        this.produce(topic, 100, "simple-benchmark-produce", this.numRecords, true, this.numRecords, true);
    }

    private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, int upperRange, boolean printStats) {
        if (sequential && upperRange < numRecords) {
            throw new IllegalArgumentException("UpperRange must be >= numRecords");
        }
        if (!sequential) {
            System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
        }
        Properties props = this.setProduceConsumeProperties(clientId);
        int key = 0;
        Random rand = new Random();
        KafkaProducer producer = new KafkaProducer(props);
        byte[] value = new byte[valueSizeBytes];
        new Random().nextBytes(value);
        long startTime = System.currentTimeMillis();
        key = sequential ? 0 : rand.nextInt(upperRange);
        for (int i = 0; i < numRecords; ++i) {
            producer.send(new ProducerRecord(topic, (Object)key, (Object)value));
            key = sequential ? ++key : rand.nextInt(upperRange);
            this.processedRecords.getAndIncrement();
            this.processedBytes += (long)(value.length + 32);
        }
        producer.close();
        long endTime = System.currentTimeMillis();
        if (printStats) {
            this.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
        }
    }

    public void consume(String topic) {
        if (this.maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
            return;
        }
        Properties props = this.setProduceConsumeProperties("simple-benchmark-consumer");
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = this.getAllPartitions(consumer, topic);
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);
        Integer key = null;
        long startTime = System.currentTimeMillis();
        block0: do {
            ConsumerRecords records;
            if ((records = consumer.poll(500L)).isEmpty()) {
                if (this.processedRecords.get() != this.numRecords) continue;
                break;
            }
            for (ConsumerRecord record : records) {
                this.processedRecords.getAndIncrement();
                this.processedBytes += (long)(((byte[])record.value()).length + 32);
                Integer recKey = (Integer)record.key();
                if (key == null || key < recKey) {
                    key = recKey;
                }
                if (this.processedRecords.get() != this.numRecords) continue;
                continue block0;
            }
        } while (this.processedRecords.get() != this.numRecords);
        long endTime = System.currentTimeMillis();
        consumer.close();
        this.printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
    }

    private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
        Properties props = this.setStreamProperties("simple-benchmark-streams");
        StreamsBuilder builder = new StreamsBuilder();
        KStream source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){

                    public void init(ProcessorContext context) {
                    }

                    public void process(Integer key, byte[] value) {
                        SimpleBenchmark.this.processedRecords.getAndIncrement();
                        SimpleBenchmark.this.processedBytes += (long)(value.length + 32);
                        if (SimpleBenchmark.this.processedRecords.get() == SimpleBenchmark.this.numRecords) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return this.createKafkaStreamsWithExceptionHandler(builder, props);
    }

    private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
        Properties props = this.setStreamProperties("simple-benchmark-streams-with-sink");
        StreamsBuilder builder = new StreamsBuilder();
        KStream source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
        source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){

                    public void init(ProcessorContext context) {
                    }

                    public void process(Integer key, byte[] value) {
                        SimpleBenchmark.this.processedRecords.getAndIncrement();
                        SimpleBenchmark.this.processedBytes += (long)(value.length + 32);
                        if (SimpleBenchmark.this.processedRecords.get() == SimpleBenchmark.this.numRecords) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return this.createKafkaStreamsWithExceptionHandler(builder, props);
    }

    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic, String kTableTopic, CountDownLatch latch) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input1 = builder.stream(kStreamTopic);
        KTable input2 = builder.table(kTableTopic);
        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
        return this.createKafkaStreamsWithExceptionHandler(builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1, String kTableTopic2, CountDownLatch latch) {
        StreamsBuilder builder = new StreamsBuilder();
        KTable input1 = builder.table(kTableTopic1);
        KTable input2 = builder.table(kTableTopic2);
        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
        return this.createKafkaStreamsWithExceptionHandler(builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1, String kStreamTopic2, CountDownLatch latch) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input1 = builder.stream(kStreamTopic1);
        KStream input2 = builder.stream(kStreamTopic2);
        long timeDifferenceMs = 10000L;
        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of((long)10000L)).foreach(new CountDownAction(latch));
        return this.createKafkaStreamsWithExceptionHandler(builder, streamConfig);
    }

    private KafkaStreams createKafkaStreamsWithStateStore(String topic, final CountDownLatch latch, boolean enableCaching) {
        Properties props = this.setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.Integer(), (Serde)Serdes.ByteArray());
        if (enableCaching) {
            builder.addStateStore(storeBuilder.withCachingEnabled());
        } else {
            builder.addStateStore(storeBuilder);
        }
        KStream source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
        source.process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){
                    KeyValueStore<Integer, byte[]> store;

                    public void init(ProcessorContext context) {
                        this.store = (KeyValueStore)context.getStateStore("store");
                    }

                    public void process(Integer key, byte[] value) {
                        this.store.put((Object)key, (Object)value);
                        SimpleBenchmark.this.processedRecords.getAndIncrement();
                        SimpleBenchmark.this.processedBytes += (long)(value.length + 32);
                        if (SimpleBenchmark.this.processedRecords.get() == SimpleBenchmark.this.numRecords) {
                            latch.countDown();
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"store"});
        return this.createKafkaStreamsWithExceptionHandler(builder, props);
    }

    private KafkaStreams createKafkaStreamsWithExceptionHandler(StreamsBuilder builder, Properties props) {
        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
                streamsClient.close(30L, TimeUnit.SECONDS);
            }
        });
        return streamsClient;
    }

    private double megabytesPerSec(long time, long processedBytes) {
        return (double)processedBytes / 1024.0 / 1024.0 / ((double)time / 1000.0);
    }

    private double recordsPerSec(long time, int numRecords) {
        return (double)numRecords / ((double)time / 1000.0);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    private class CountDownAction<V>
    implements ForeachAction<Integer, V> {
        private CountDownLatch latch;

        CountDownAction(CountDownLatch latch) {
            this.latch = latch;
        }

        public void apply(Integer key, V value) {
            SimpleBenchmark.this.processedRecords.getAndIncrement();
            if (value instanceof byte[]) {
                SimpleBenchmark.this.processedBytes += (long)(((byte[])value).length + 32);
            } else if (value instanceof Long) {
                SimpleBenchmark.this.processedBytes += 96L;
            } else {
                System.err.println("Unknown value type in CountDownAction");
            }
            if (SimpleBenchmark.this.processedRecords.get() == SimpleBenchmark.this.numRecords) {
                this.latch.countDown();
            }
        }
    }
}

