package org.apache.samza.tools.benchmark;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.ParseException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;

/* loaded from: input_file:org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.class */
public class SystemConsumerWithSamzaBench extends AbstractSamzaBench {

    /* loaded from: input_file:org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench$MessageConsumer.class */
    private class MessageConsumer implements MapFunction<Object, Object> {
        AtomicInteger eventsConsumed;
        volatile Instant startTime;

        private MessageConsumer() {
            this.eventsConsumed = new AtomicInteger(0);
        }

        public Object apply(Object obj) {
            this.eventsConsumed.incrementAndGet();
            if (this.eventsConsumed.get() == 1) {
                this.startTime = Instant.now();
            }
            return obj;
        }

        public int getEventsConsumed() {
            return this.eventsConsumed.get();
        }
    }

    public SystemConsumerWithSamzaBench(String[] strArr) throws ParseException {
        super("system-consumer-with-samza-bench", strArr);
    }

    public static void main(String[] strArr) throws Exception {
        new SystemConsumerBench(strArr).start();
    }

    @Override // org.apache.samza.tools.benchmark.AbstractSamzaBench
    public void addMoreSystemConfigs(Properties properties) {
        properties.put("app.runner.class", LocalApplicationRunner.class.getName());
        List list = (List) IntStream.range(this.startPartition, this.endPartition).boxed().collect(Collectors.toList());
        properties.put("app.name", "SamzaBench");
        properties.put("processor.id", "1");
        properties.put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName());
        properties.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS, this.streamId), Joiner.on(",").join(list));
        properties.put("task.name.grouper.factory", ConfigBasedSspGrouperFactory.class.getName());
    }

    @Override // org.apache.samza.tools.benchmark.AbstractSamzaBench
    public void start() throws IOException, InterruptedException {
        super.start();
        MessageConsumer messageConsumer = new MessageConsumer();
        ApplicationRunner applicationRunner = ApplicationRunners.getApplicationRunner(streamApplicationDescriptor -> {
            streamApplicationDescriptor.getInputStream(new GenericSystemDescriptor(this.systemName, (String) new SystemConfig(this.config).getSystemFactory(this.systemName).get()).getInputDescriptor(this.streamId, new NoOpSerde())).map(messageConsumer);
        }, new MapConfig());
        applicationRunner.run();
        while (messageConsumer.getEventsConsumed() < this.totalEvents) {
            Thread.sleep(10L);
        }
        Instant now = Instant.now();
        applicationRunner.kill();
        System.out.println("\n*******************");
        System.out.println(String.format("Started at %s Ending at %s ", messageConsumer.startTime, now));
        System.out.println(String.format("Event Rate is %s Messages/Sec ", Long.valueOf((messageConsumer.getEventsConsumed() * 1000) / Duration.between(messageConsumer.startTime, Instant.now()).toMillis())));
        System.out.println("Event Rate is " + ((messageConsumer.getEventsConsumed() * 1000) / Duration.between(messageConsumer.startTime, now).toMillis()));
        System.out.println("*******************\n");
        System.exit(0);
    }
}
