package org.apache.kafka.streams.perf;

import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;

/* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark.class */
public class SimpleBenchmark {
    private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
    private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
    private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
    private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
    private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.1
        public byte[] apply(byte[] bArr, byte[] bArr2) {
            return bArr != null ? bArr : bArr2 != null ? bArr2 : new byte[100];
        }
    };
    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
    long processedBytes = 0;
    int processedRecords = 0;
    private static final long POLL_MS = 500;
    private static final long COMMIT_INTERVAL_MS = 30000;
    private static final int MAX_POLL_RECORDS = 1000;
    private static final int KEY_SPACE_SIZE = 10000;
    private static final long STREAM_STREAM_JOIN_WINDOW = 10000;
    private static final long AGGREGATE_WINDOW_SIZE = 1000;
    private static final long AGGREGATE_WINDOW_ADVANCE = 500;
    private static final int SOCKET_SIZE_BYTES = 1048576;
    private static final int MAX_WAIT_MS = 180000;
    final String testName;
    final int numRecords;
    final Properties props;
    private final int valueSize;
    private final double keySkew;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark$CountDownAction.class */
    public class CountDownAction implements ForeachAction<Integer, byte[]> {
        private final CountDownLatch latch;

        CountDownAction(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void apply(Integer num, byte[] bArr) {
            SimpleBenchmark.this.processedRecords++;
            SimpleBenchmark.this.processedBytes += 32 + bArr.length;
            if (SimpleBenchmark.this.processedRecords == SimpleBenchmark.this.numRecords) {
                this.latch.countDown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark$ZipfGenerator.class */
    public class ZipfGenerator {
        private final int size;
        private final double skew;
        private final Random rand = new Random(System.currentTimeMillis());
        private double bottom = 0.0d;

        ZipfGenerator(int i, double d) {
            this.size = i;
            this.skew = d;
            for (int i2 = 1; i2 < i; i2++) {
                this.bottom += 1.0d / Math.pow(i2, this.skew);
            }
        }

        int next() {
            if (this.skew == 0.0d) {
                return this.rand.nextInt(this.size);
            }
            int nextInt = this.rand.nextInt(this.size);
            double pow = (1.0d / Math.pow(nextInt, this.skew)) / this.bottom;
            double nextDouble = this.rand.nextDouble();
            while (nextDouble >= pow) {
                nextInt = this.rand.nextInt(this.size);
                pow = (1.0d / Math.pow(nextInt, this.skew)) / this.bottom;
                nextDouble = this.rand.nextDouble();
            }
            return nextInt;
        }
    }

    private SimpleBenchmark(Properties properties, String str, int i, double d, int i2) {
        this.props = properties;
        this.testName = str;
        this.keySkew = d;
        this.valueSize = i2;
        this.numRecords = i;
    }

    private void run() {
        String str = this.testName;
        boolean z = -1;
        switch (str.hashCode()) {
            case -2077705144:
                if (str.equals("streamprocesswithsink")) {
                    z = 7;
                    break;
                }
                break;
            case -1531011740:
                if (str.equals("consumeproduce")) {
                    z = 3;
                    break;
                }
                break;
            case -1501077777:
                if (str.equals("streamprocess")) {
                    z = 6;
                    break;
                }
                break;
            case -1024762838:
                if (str.equals("streamstreamjoin")) {
                    z = 11;
                    break;
                }
                break;
            case -788901988:
                if (str.equals("streamprocesswithwindowstore")) {
                    z = 9;
                    break;
                }
                break;
            case -319839624:
                if (str.equals("streamtablejoin")) {
                    z = 10;
                    break;
                }
                break;
            case 114739264:
                if (str.equals("yahoo")) {
                    z = 13;
                    break;
                }
                break;
            case 448093982:
                if (str.equals("streamcountwindowed")) {
                    z = 5;
                    break;
                }
                break;
            case 951516156:
                if (str.equals("consume")) {
                    z = 2;
                    break;
                }
                break;
            case 1631039887:
                if (str.equals("streamcount")) {
                    z = 4;
                    break;
                }
                break;
            case 1642752229:
                if (str.equals("streamprocesswithstatestore")) {
                    z = 8;
                    break;
                }
                break;
            case 1696509418:
                if (str.equals("tabletablejoin")) {
                    z = 12;
                    break;
                }
                break;
            case 1844446463:
                if (str.equals("load-one")) {
                    z = false;
                    break;
                }
                break;
            case 1844451557:
                if (str.equals("load-two")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, this.numRecords, this.keySkew, this.valueSize);
                return;
            case true:
                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, this.numRecords, this.keySkew, this.valueSize);
                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, this.numRecords, this.keySkew, this.valueSize);
                return;
            case true:
                consume(SOURCE_TOPIC_ONE);
                return;
            case true:
                consumeAndProduce(SOURCE_TOPIC_ONE);
                return;
            case true:
                countStreamsNonWindowed(SOURCE_TOPIC_ONE);
                return;
            case true:
                countStreamsWindowed(SOURCE_TOPIC_ONE);
                return;
            case true:
                processStream(SOURCE_TOPIC_ONE);
                return;
            case true:
                processStreamWithSink(SOURCE_TOPIC_ONE);
                return;
            case true:
                processStreamWithStateStore(SOURCE_TOPIC_ONE);
                return;
            case true:
                processStreamWithWindowStore(SOURCE_TOPIC_ONE);
                return;
            case true:
                streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                return;
            case true:
                streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                return;
            case true:
                tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                return;
            case true:
                yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
                return;
            default:
                throw new RuntimeException("Unknown test name " + this.testName);
        }
    }

    public static void main(String[] strArr) throws IOException {
        if (strArr.length < 5) {
            System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
            System.exit(1);
        }
        String str = strArr[0];
        String lowerCase = strArr[1].toLowerCase(Locale.ROOT);
        int parseInt = Integer.parseInt(strArr[2]);
        double parseDouble = Double.parseDouble(strArr[3]);
        int parseInt2 = Integer.parseInt(strArr[4]);
        Properties loadProps = Utils.loadProps(str);
        if (loadProps.getProperty("bootstrap.servers") == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            System.exit(1);
        }
        System.out.println("StreamsTest instance started");
        System.out.println("testName=" + lowerCase);
        System.out.println("streamsProperties=" + loadProps);
        System.out.println("numRecords=" + parseInt);
        System.out.println("keySkew=" + parseDouble);
        System.out.println("valueSize=" + parseInt2);
        new SimpleBenchmark(loadProps, lowerCase, parseInt, parseDouble, parseInt2).run();
    }

    public void setStreamProperties(String str) {
        this.props.put("application.id", str);
        this.props.put("client.id", "simple-benchmark");
        this.props.put("poll.ms", 500L);
        this.props.put("commit.interval.ms", Long.valueOf(COMMIT_INTERVAL_MS));
        this.props.put("default.key.serde", Serdes.Integer().getClass());
        this.props.put("default.value.serde", Serdes.ByteArray().getClass());
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("receive.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        this.props.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        this.props.put("linger.ms", 5000);
        this.props.put("batch.size", 131072);
    }

    private Properties setProduceConsumeProperties(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.props.getProperty("bootstrap.servers"));
        properties.put("client.id", str);
        properties.put("linger.ms", 5000);
        properties.put("batch.size", 131072);
        properties.put("send.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.put("key.deserializer", IntegerDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.put("enable.auto.commit", "false");
        properties.put("receive.buffer.bytes", Integer.valueOf(SOCKET_SIZE_BYTES));
        properties.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resetStats() {
        this.processedRecords = 0;
        this.processedBytes = 0L;
    }

    private void produce(String str, String str2, int i, double d, int i2) {
        Properties produceConsumeProperties = setProduceConsumeProperties(str);
        ZipfGenerator zipfGenerator = new ZipfGenerator(KEY_SPACE_SIZE, d);
        KafkaProducer kafkaProducer = new KafkaProducer(produceConsumeProperties);
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[i2];
                new Random(System.currentTimeMillis()).nextBytes(bArr);
                for (int i3 = 0; i3 < i; i3++) {
                    kafkaProducer.send(new ProducerRecord(str2, Integer.valueOf(zipfGenerator.next()), bArr));
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private void consumeAndProduce(String str) {
        Properties produceConsumeProperties = setProduceConsumeProperties("simple-benchmark-consumer");
        Properties produceConsumeProperties2 = setProduceConsumeProperties("simple-benchmark-producer");
        long currentTimeMillis = System.currentTimeMillis();
        KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(produceConsumeProperties);
        Throwable th = null;
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(produceConsumeProperties2);
            Throwable th2 = null;
            try {
                try {
                    List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, str);
                    kafkaConsumer.assign(allPartitions);
                    kafkaConsumer.seekToBeginning(allPartitions);
                    do {
                        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(500L));
                        if (!poll.isEmpty()) {
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                kafkaProducer.send(new ProducerRecord(SINK_TOPIC, consumerRecord.key(), consumerRecord.value()));
                                this.processedRecords++;
                                this.processedBytes += ((byte[]) consumerRecord.value()).length + 32;
                                if (this.processedRecords == this.numRecords) {
                                    break;
                                }
                            }
                        } else if (this.processedRecords == this.numRecords) {
                            break;
                        }
                    } while (this.processedRecords != this.numRecords);
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                    printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", System.currentTimeMillis() - currentTimeMillis);
                } finally {
                }
            } catch (Throwable th4) {
                if (kafkaProducer != null) {
                    if (th2 != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
        }
    }

    private void consume(String str) {
        Properties produceConsumeProperties = setProduceConsumeProperties("simple-benchmark-consumer");
        long currentTimeMillis = System.currentTimeMillis();
        KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(produceConsumeProperties);
        Throwable th = null;
        try {
            try {
                List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, str);
                kafkaConsumer.assign(allPartitions);
                kafkaConsumer.seekToBeginning(allPartitions);
                do {
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(500L));
                    if (!poll.isEmpty()) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            this.processedRecords++;
                            this.processedBytes += ((byte[]) consumerRecord.value()).length + 32;
                            if (this.processedRecords == this.numRecords) {
                                break;
                            }
                        }
                    } else if (this.processedRecords == this.numRecords) {
                        break;
                    }
                } while (this.processedRecords != this.numRecords);
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", System.currentTimeMillis() - currentTimeMillis);
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private void processStream(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-streams-source");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek(new CountDownAction(countDownLatch));
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    private void processStreamWithSink(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-streams-source-sink");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).peek(new CountDownAction(countDownLatch)).to(SINK_TOPIC);
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    private void processStreamWithStateStore(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-streams-with-store");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE).withCachingEnabled());
        streamsBuilder.stream(str).peek(new CountDownAction(countDownLatch)).process(new ProcessorSupplier<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.2
            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.2.1
                    KeyValueStore<Integer, byte[]> store;

                    public void init(ProcessorContext processorContext) {
                        super.init(processorContext);
                        this.store = processorContext.getStateStore("store");
                    }

                    public void process(Integer num, byte[] bArr) {
                        this.store.get(num);
                        this.store.put(num, bArr);
                    }
                };
            }
        }, new String[]{"store"});
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    private void processStreamWithWindowStore(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-streams-with-store");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", Duration.ofMillis(3000L), Duration.ofMillis(AGGREGATE_WINDOW_SIZE), false), INTEGER_SERDE, BYTE_SERDE).withCachingEnabled());
        streamsBuilder.stream(str).peek(new CountDownAction(countDownLatch)).process(new ProcessorSupplier<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.3
            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.3.1
                    WindowStore<Integer, byte[]> store;

                    public void init(ProcessorContext processorContext) {
                        super.init(processorContext);
                        this.store = processorContext.getStateStore("store");
                    }

                    public void process(Integer num, byte[] bArr) {
                        long timestamp = context().timestamp();
                        KeyValueIterator fetch = this.store.fetch(Integer.valueOf(num.intValue() - 10), Integer.valueOf(num.intValue() + 10), Instant.ofEpochMilli(timestamp - SimpleBenchmark.AGGREGATE_WINDOW_SIZE), Instant.ofEpochMilli(timestamp));
                        while (fetch.hasNext()) {
                            fetch.next();
                        }
                        fetch.close();
                        this.store.put(num, bArr);
                    }
                };
            }
        }, new String[]{"store"});
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    private void countStreamsNonWindowed(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-nonwindowed-count");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).peek(new CountDownAction(countDownLatch)).groupByKey().count();
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", countDownLatch);
    }

    private void countStreamsWindowed(String str) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-windowed-count");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).peek(new CountDownAction(countDownLatch)).groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(AGGREGATE_WINDOW_SIZE)).advanceBy(Duration.ofMillis(500L))).count();
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", countDownLatch);
    }

    private void streamTableJoin(String str, String str2) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-stream-table-join");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).leftJoin(streamsBuilder.table(str2), VALUE_JOINER).foreach(new CountDownAction(countDownLatch));
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    private void streamStreamJoin(String str, String str2) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-stream-stream-join");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).leftJoin(streamsBuilder.stream(str2), VALUE_JOINER, JoinWindows.of(Duration.ofMillis(STREAM_STREAM_JOIN_WINDOW))).foreach(new CountDownAction(countDownLatch));
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", countDownLatch);
    }

    private void tableTableJoin(String str, String str2) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setStreamProperties("simple-benchmark-table-table-join");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(str).leftJoin(streamsBuilder.table(str2), VALUE_JOINER).toStream().foreach(new CountDownAction(countDownLatch));
        runGenericBenchmark(createKafkaStreamsWithExceptionHandler(streamsBuilder, this.props), "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", countDownLatch);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void printResults(String str, long j) {
        System.out.println(str + this.processedRecords + "/" + j + "/" + recordsPerSec(j, this.processedRecords) + "/" + megabytesPerSec(j, this.processedBytes));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runGenericBenchmark(KafkaStreams kafkaStreams, String str, CountDownLatch countDownLatch) {
        long j;
        kafkaStreams.start();
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis;
        while (true) {
            j = j2;
            if (countDownLatch.getCount() <= 0 || j - currentTimeMillis >= 180000) {
                break;
            }
            try {
                countDownLatch.await(AGGREGATE_WINDOW_SIZE, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
            j2 = System.currentTimeMillis();
        }
        kafkaStreams.close();
        printResults(str, j - currentTimeMillis);
    }

    private KafkaStreams createKafkaStreamsWithExceptionHandler(StreamsBuilder streamsBuilder, Properties properties) {
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.4
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + thread + ": " + th);
                kafkaStreams.close(Duration.ofSeconds(30L));
            }
        });
        return kafkaStreams;
    }

    private double megabytesPerSec(long j, long j2) {
        return ((j2 / 1024.0d) / 1024.0d) / (j / 1000.0d);
    }

    private double recordsPerSec(long j, int i) {
        return i / (j / 1000.0d);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> kafkaConsumer, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(str)) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }

    private void yahooBenchmark(String str, String str2) {
        new YahooBenchmark(this, str, str2).run();
    }
}
