package org.apache.kafka.streams.perf;

import java.io.File;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;

/* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark.class */
public class SimpleBenchmark {
    private final String kafka;
    private final File stateDir;
    private final Boolean loadPhase;
    private final String testName;
    private static final String ALL_TESTS = "all";
    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final String COUNT_TOPIC = "countTopic";
    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
    private static int numRecords;
    private static final int VALUE_SIZE = 100;
    private static final long POLL_MS = 500;
    private static final int MAX_POLL_RECORDS = 1000;
    private static final int SOCKET_SIZE_BYTES = 1048576;
    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.1
        public byte[] apply(byte[] bArr, byte[] bArr2) {
            if (bArr == null && bArr2 == null) {
                return new byte[SimpleBenchmark.VALUE_SIZE];
            }
            if (bArr == null && bArr2 != null) {
                return bArr2;
            }
            if (bArr != null && bArr2 == null) {
                return bArr;
            }
            byte[] bArr3 = new byte[bArr.length + bArr2.length];
            System.arraycopy(bArr, 0, bArr3, 0, bArr.length);
            System.arraycopy(bArr2, 0, bArr3, bArr.length, bArr2.length);
            return bArr3;
        }
    };
    private static int processedRecords = 0;
    private static long processedBytes = 0;
    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark$CountDownAction.class */
    public class CountDownAction<V> implements ForeachAction<Integer, V> {
        private CountDownLatch latch;

        CountDownAction(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void apply(Integer num, V v) {
            SimpleBenchmark.access$008();
            if (v instanceof byte[]) {
                SimpleBenchmark.access$114(((byte[]) v).length + 32);
            } else if (v instanceof Long) {
                SimpleBenchmark.access$114(96L);
            } else {
                System.err.println("Unknown value type in CountDownAction");
            }
            if (SimpleBenchmark.processedRecords == SimpleBenchmark.numRecords) {
                this.latch.countDown();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
            apply((Integer) obj, (Integer) obj2);
        }
    }

    public SimpleBenchmark(File file, String str, Boolean bool, String str2) {
        this.stateDir = file;
        this.kafka = str;
        this.loadPhase = bool;
        this.testName = str2;
    }

    private void run() throws Exception {
        String str = this.testName;
        boolean z = -1;
        switch (str.hashCode()) {
            case -2080909200:
                if (str.equals("ktablektablejoin")) {
                    z = 10;
                    break;
                }
                break;
            case -2056806737:
                if (str.equals("processstream")) {
                    z = 4;
                    break;
                }
                break;
            case -1918026744:
                if (str.equals("processstreamwithsink")) {
                    z = 5;
                    break;
                }
                break;
            case -309474080:
                if (str.equals("produce")) {
                    z = true;
                    break;
                }
                break;
            case 96673:
                if (str.equals(ALL_TESTS)) {
                    z = false;
                    break;
                }
                break;
            case 94851343:
                if (str.equals("count")) {
                    z = 3;
                    break;
                }
                break;
            case 478156965:
                if (str.equals("processstreamwithstatestore")) {
                    z = 6;
                    break;
                }
                break;
            case 951516156:
                if (str.equals("consume")) {
                    z = 2;
                    break;
                }
                break;
            case 1447667402:
                if (str.equals("kstreamkstreamjoin")) {
                    z = 9;
                    break;
                }
                break;
            case 1505164167:
                if (str.equals("processstreamwithcachedstatestore")) {
                    z = 7;
                    break;
                }
                break;
            case 1699578840:
                if (str.equals("kstreamktablejoin")) {
                    z = 8;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                produce(SOURCE_TOPIC);
                consume(SOURCE_TOPIC);
                processStream(SOURCE_TOPIC);
                processStreamWithSink(SOURCE_TOPIC);
                processStreamWithStateStore(SOURCE_TOPIC);
                processStreamWithCachedStateStore(SOURCE_TOPIC);
                count(COUNT_TOPIC);
                kStreamKTableJoin("joinSourceTopic1KStreamKTable", "joinSourceTopic2KStreamKTable");
                kStreamKStreamJoin("joinSourceTopic1KStreamKStream", "joinSourceTopic2KStreamKStream");
                kTableKTableJoin("joinSourceTopic1KTableKTable", "joinSourceTopic2KTableKTable");
                return;
            case true:
                produce(SOURCE_TOPIC);
                return;
            case true:
                consume(SOURCE_TOPIC);
                return;
            case true:
                count(COUNT_TOPIC);
                return;
            case true:
                processStream(SOURCE_TOPIC);
                return;
            case true:
                processStreamWithSink(SOURCE_TOPIC);
                return;
            case true:
                processStreamWithStateStore(SOURCE_TOPIC);
                return;
            case true:
                processStreamWithCachedStateStore(SOURCE_TOPIC);
                return;
            case true:
                kStreamKTableJoin("joinSourceTopic1KStreamKTable", "joinSourceTopic2KStreamKTable");
                return;
            case true:
                kStreamKStreamJoin("joinSourceTopic1KStreamKStream", "joinSourceTopic2KStreamKStream");
                return;
            case true:
                kTableKTableJoin("joinSourceTopic1KTableKTable", "joinSourceTopic2KTableKTable");
                return;
            default:
                throw new Exception("Unknown test name " + this.testName);
        }
    }

    public static void main(String[] strArr) throws Exception {
        String str = strArr.length > 0 ? strArr[0] : "localhost:9092";
        String absolutePath = strArr.length > 1 ? strArr[1] : TestUtils.tempDirectory().getAbsolutePath();
        numRecords = strArr.length > 2 ? Integer.parseInt(strArr[2]) : 10000000;
        boolean parseBoolean = strArr.length > 3 ? Boolean.parseBoolean(strArr[3]) : false;
        String lowerCase = strArr.length > 4 ? strArr[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
        File file = new File(absolutePath);
        file.mkdir();
        new File(file, "rocksdb-test").mkdir();
        System.out.println("StreamsTest instance started");
        System.out.println("kafka=" + str);
        System.out.println("stateDir=" + file);
        System.out.println("numRecords=" + numRecords);
        System.out.println("loadPhase=" + parseBoolean);
        System.out.println("testName=" + lowerCase);
        new SimpleBenchmark(file, str, Boolean.valueOf(parseBoolean), lowerCase).run();
    }

    private Properties setStreamProperties(String str) {
        Properties properties = new Properties();
        properties.put("application.id", str);
        properties.put("state.dir", this.stateDir.toString());
        properties.put("bootstrap.servers", this.kafka);
        properties.put("num.stream.threads", 1);
        properties.put("auto.offset.reset", "earliest");
        properties.put("receive.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.ByteArray().getClass());
        properties.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        properties.put("poll.ms", Long.valueOf(POLL_MS));
        return properties;
    }

    private Properties setProduceConsumeProperties(String str) {
        Properties properties = new Properties();
        properties.put("client.id", str);
        properties.put("bootstrap.servers", this.kafka);
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.put("send.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        properties.put("key.deserializer", IntegerDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.put("enable.auto.commit", "false");
        properties.put("receive.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        properties.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        return properties;
    }

    private boolean maybeSetupPhase(String str, String str2, boolean z) throws Exception {
        processedRecords = 0;
        processedBytes = 0L;
        if (!this.loadPhase.booleanValue()) {
            return false;
        }
        if (z && this.testName.equals(ALL_TESTS)) {
            return true;
        }
        System.out.println("Initializing topic " + str);
        produce(str, VALUE_SIZE, str2, numRecords, true, numRecords, false);
        return true;
    }

    private KafkaStreams createCountStreams(Properties properties, String str, CountDownLatch countDownLatch) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{str}).groupByKey().count("tmpStoreName").foreach(new CountDownAction(countDownLatch));
        return new KafkaStreams(kStreamBuilder, properties);
    }

    public void count(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-produce-count", false)) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runGenericBenchmark(createCountStreams(setStreamProperties("simple-benchmark-count"), str, countDownLatch), "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", countDownLatch);
    }

    public void kStreamKTableJoin(String str, String str2) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-produce-kstream", false)) {
            maybeSetupPhase(str2, "simple-benchmark-produce-ktable", false);
        } else {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            runGenericBenchmark(createKafkaStreamsKStreamKTableJoin(setStreamProperties("simple-benchmark-kstream-ktable-join"), str, str2, countDownLatch), "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
        }
    }

    public void kStreamKStreamJoin(String str, String str2) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-produce-kstream-topic1", false)) {
            maybeSetupPhase(str2, "simple-benchmark-produce-kstream-topic2", false);
        } else {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            runGenericBenchmark(createKafkaStreamsKStreamKStreamJoin(setStreamProperties("simple-benchmark-kstream-kstream-join"), str, str2, countDownLatch), "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", countDownLatch);
        }
    }

    public void kTableKTableJoin(String str, String str2) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-produce-ktable-topic1", false)) {
            maybeSetupPhase(str2, "simple-benchmark-produce-ktable-topic2", false);
        } else {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            runGenericBenchmark(createKafkaStreamsKTableKTableJoin(setStreamProperties("simple-benchmark-ktable-ktable-join"), str, str2, countDownLatch), "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
        }
    }

    private void printResults(String str, long j) {
        System.out.println(str + processedRecords + "/" + j + "/" + recordsPerSec(j, processedRecords) + "/" + megabytesPerSec(j, processedBytes));
    }

    private void runGenericBenchmark(KafkaStreams kafkaStreams, String str, CountDownLatch countDownLatch) {
        kafkaStreams.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (countDownLatch.getCount() > 0) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
            }
        }
        printResults(str, System.currentTimeMillis() - currentTimeMillis);
        kafkaStreams.close();
    }

    private long startStreamsThread(final KafkaStreams kafkaStreams, CountDownLatch countDownLatch) throws Exception {
        Thread thread = new Thread() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                kafkaStreams.start();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (countDownLatch.getCount() > 0) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        kafkaStreams.close();
        try {
            thread.join();
        } catch (Exception e2) {
        }
        return currentTimeMillis2 - currentTimeMillis;
    }

    public void processStream(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-process-stream-load", true)) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", startStreamsThread(createKafkaStreams(str, countDownLatch), countDownLatch));
    }

    public void processStreamWithSink(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-process-stream-with-sink-load", true)) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", startStreamsThread(createKafkaStreamsWithSink(str, countDownLatch), countDownLatch));
    }

    public void processStreamWithStateStore(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-process-stream-with-state-store-load", true)) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", startStreamsThread(createKafkaStreamsWithStateStore(str, countDownLatch, false), countDownLatch));
    }

    public void processStreamWithCachedStateStore(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", startStreamsThread(createKafkaStreamsWithStateStore(str, countDownLatch, true), countDownLatch));
    }

    public void produce(String str) throws Exception {
        if (this.loadPhase.booleanValue()) {
            return;
        }
        produce(str, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
    }

    private void produce(String str, int i, String str2, int i2, boolean z, int i3, boolean z2) throws Exception {
        processedRecords = 0;
        processedBytes = 0L;
        if (z && i3 < i2) {
            throw new Exception("UpperRange must be >= numRecords");
        }
        if (!z) {
            System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
        }
        Properties produceConsumeProperties = setProduceConsumeProperties(str2);
        Random random = new Random();
        KafkaProducer kafkaProducer = new KafkaProducer(produceConsumeProperties);
        byte[] bArr = new byte[i];
        new Random().nextBytes(bArr);
        long currentTimeMillis = System.currentTimeMillis();
        int nextInt = z ? 0 : random.nextInt(i3);
        for (int i4 = 0; i4 < i2; i4++) {
            kafkaProducer.send(new ProducerRecord(str, Integer.valueOf(nextInt), bArr));
            nextInt = z ? nextInt + 1 : random.nextInt(i3);
            processedRecords++;
            processedBytes += bArr.length + 32;
        }
        kafkaProducer.close();
        long currentTimeMillis2 = System.currentTimeMillis();
        if (z2) {
            printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", currentTimeMillis2 - currentTimeMillis);
        }
    }

    public void consume(String str) throws Exception {
        if (maybeSetupPhase(str, "simple-benchmark-consumer-load", true)) {
            return;
        }
        KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(setProduceConsumeProperties("simple-benchmark-consumer"));
        List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, str);
        kafkaConsumer.assign(allPartitions);
        kafkaConsumer.seekToBeginning(allPartitions);
        Integer num = null;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            ConsumerRecords poll = kafkaConsumer.poll(POLL_MS);
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    processedRecords++;
                    processedBytes += ((byte[]) consumerRecord.value()).length + 32;
                    Integer num2 = (Integer) consumerRecord.key();
                    if (num == null || num.intValue() < num2.intValue()) {
                        num = num2;
                    }
                    if (processedRecords == numRecords) {
                        break;
                    }
                }
            } else if (processedRecords == numRecords) {
                break;
            }
        } while (processedRecords != numRecords);
        long currentTimeMillis2 = System.currentTimeMillis();
        kafkaConsumer.close();
        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", currentTimeMillis2 - currentTimeMillis);
    }

    private KafkaStreams createKafkaStreams(String str, final CountDownLatch countDownLatch) {
        Properties streamProperties = setStreamProperties("simple-benchmark-streams");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{str}).process(new ProcessorSupplier<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.3
            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.3.1
                    public void init(ProcessorContext processorContext) {
                    }

                    public void process(Integer num, byte[] bArr) {
                        SimpleBenchmark.access$008();
                        SimpleBenchmark.access$114(bArr.length + 32);
                        if (SimpleBenchmark.processedRecords == SimpleBenchmark.numRecords) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, streamProperties);
    }

    private KafkaStreams createKafkaStreamsWithSink(String str, final CountDownLatch countDownLatch) {
        Properties streamProperties = setStreamProperties("simple-benchmark-streams-with-sink");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{str});
        stream.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
        stream.process(new ProcessorSupplier<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.4
            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.4.1
                    public void init(ProcessorContext processorContext) {
                    }

                    public void process(Integer num, byte[] bArr) {
                        SimpleBenchmark.access$008();
                        SimpleBenchmark.access$114(bArr.length + 32);
                        if (SimpleBenchmark.processedRecords == SimpleBenchmark.numRecords) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, streamProperties);
    }

    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties properties, String str, String str2, CountDownLatch countDownLatch) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{str}).leftJoin(kStreamBuilder.table(str2, str2 + "-store"), VALUE_JOINER).foreach(new CountDownAction(countDownLatch));
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, properties);
    }

    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties properties, String str, String str2, CountDownLatch countDownLatch) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.table(str, str + "-store").leftJoin(kStreamBuilder.table(str2, str2 + "-store"), VALUE_JOINER).foreach(new CountDownAction(countDownLatch));
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, properties);
    }

    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties properties, String str, String str2, CountDownLatch countDownLatch) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{str}).leftJoin(kStreamBuilder.stream(new String[]{str2}), VALUE_JOINER, JoinWindows.of(10000L)).foreach(new CountDownAction(countDownLatch));
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, properties);
    }

    private KafkaStreams createKafkaStreamsWithStateStore(String str, final CountDownLatch countDownLatch, boolean z) {
        Properties streamProperties = setStreamProperties("simple-benchmark-streams-with-store" + z);
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        if (z) {
            kStreamBuilder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build(), new String[0]);
        } else {
            kStreamBuilder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build(), new String[0]);
        }
        kStreamBuilder.stream(INTEGER_SERDE, BYTE_SERDE, new String[]{str}).process(new ProcessorSupplier<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.5
            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.5.1
                    KeyValueStore<Integer, byte[]> store;

                    public void init(ProcessorContext processorContext) {
                        this.store = processorContext.getStateStore("store");
                    }

                    public void process(Integer num, byte[] bArr) {
                        this.store.put(num, bArr);
                        SimpleBenchmark.access$008();
                        SimpleBenchmark.access$114(bArr.length + 32);
                        if (SimpleBenchmark.processedRecords == SimpleBenchmark.numRecords) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"store"});
        return createKafkaStreamsWithExceptionHandler(kStreamBuilder, streamProperties);
    }

    private KafkaStreams createKafkaStreamsWithExceptionHandler(KStreamBuilder kStreamBuilder, Properties properties) {
        final KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.6
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + thread + ": " + th);
                kafkaStreams.close(30L, TimeUnit.SECONDS);
            }
        });
        return kafkaStreams;
    }

    private double megabytesPerSec(long j, long j2) {
        return ((j2 / 1024.0d) / 1024.0d) / (j / 1000.0d);
    }

    private double recordsPerSec(long j, int i) {
        return i / (j / 1000.0d);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> kafkaConsumer, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(str)) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }

    static /* synthetic */ int access$008() {
        int i = processedRecords;
        processedRecords = i + 1;
        return i;
    }

    static /* synthetic */ long access$114(long j) {
        long j2 = processedBytes + j;
        processedBytes = j2;
        return j2;
    }
}
