package org.apache.samza.tools.benchmark;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.ParseException;
import org.apache.samza.Partition;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.NoOpMetricsRegistry;

/* loaded from: input_file:org/apache/samza/tools/benchmark/SystemConsumerBench.class */
public class SystemConsumerBench extends AbstractSamzaBench {
    public static void main(String[] strArr) throws Exception {
        new SystemConsumerBench(strArr).start();
    }

    public SystemConsumerBench(String[] strArr) throws ParseException {
        super("system-consumer-bench", strArr);
    }

    @Override // org.apache.samza.tools.benchmark.AbstractSamzaBench
    public void start() throws IOException, InterruptedException {
        super.start();
        SystemStreamMetadata systemStreamMetadata = (SystemStreamMetadata) this.factory.getAdmin(this.systemName, this.config).getSystemStreamMetadata(Collections.singleton(this.physicalStreamName)).get(this.physicalStreamName);
        NoOpMetricsRegistry noOpMetricsRegistry = new NoOpMetricsRegistry();
        Set<SystemStreamPartition> createSSPs = createSSPs(this.systemName, this.physicalStreamName, this.startPartition, this.endPartition);
        SystemConsumer consumer = this.factory.getConsumer(this.systemName, this.config, noOpMetricsRegistry);
        for (SystemStreamPartition systemStreamPartition : createSSPs) {
            consumer.register(systemStreamPartition, ((SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata.getSystemStreamPartitionMetadata().get(systemStreamPartition.getPartition())).getOldestOffset());
        }
        consumer.start();
        System.out.println("starting consumption at " + Instant.now());
        Instant now = Instant.now();
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= this.totalEvents) {
                System.out.println("Ending consumption at " + Instant.now());
                System.out.println(String.format("Event Rate is %s Messages/Sec ", Long.valueOf((i2 * 1000) / Duration.between(now, Instant.now()).toMillis())));
                consumer.stop();
                System.exit(0);
                return;
            }
            i = i2 + consumer.poll(createSSPs, 2000L).values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum();
        }
    }

    Set<SystemStreamPartition> createSSPs(String str, String str2, int i, int i2) {
        return (Set) IntStream.range(i, i2).mapToObj(i3 -> {
            return new SystemStreamPartition(str, str2, new Partition(i3));
        }).collect(Collectors.toSet());
    }
}
