package org.apache.kafka.streams.tests;

import java.io.File;
import java.lang.Thread;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SmokeTestUtil;

/* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestClient.class */
public class SmokeTestClient extends SmokeTestUtil {
    private final String kafka;
    private final File stateDir;
    private KafkaStreams streams;
    private Thread thread;
    private boolean uncaughtException = false;

    public SmokeTestClient(File file, String str) {
        this.stateDir = file;
        this.kafka = str;
    }

    public void start() {
        this.streams = createKafkaStreams(this.stateDir, this.kafka);
        this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
                SmokeTestClient.this.uncaughtException = true;
                th.printStackTrace();
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.2
            @Override // java.lang.Runnable
            public void run() {
                SmokeTestClient.this.close();
            }
        }));
        this.thread = new Thread() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                SmokeTestClient.this.streams.start();
            }
        };
        this.thread.start();
    }

    public void close() {
        this.streams.close(5L, TimeUnit.SECONDS);
        if (!this.uncaughtException) {
            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
        }
        try {
            this.thread.join();
        } catch (Exception e) {
            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
        }
    }

    private static KafkaStreams createKafkaStreams(File file, String str) {
        Properties properties = new Properties();
        properties.put("application.id", "SmokeTest");
        properties.put("state.dir", file.toString());
        properties.put("bootstrap.servers", str);
        properties.put("num.stream.threads", 3);
        properties.put("num.standby.replicas", 2);
        properties.put("buffered.records.per.partition", 100);
        properties.put("commit.interval.ms", 1000);
        properties.put("replication.factor", 3);
        properties.put("auto.offset.reset", "earliest");
        properties.put("retries", Integer.MAX_VALUE);
        properties.put("acks", "all");
        properties.put(StreamsConfig.producerPrefix("request.timeout.ms"), 60000);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Consumed with = Consumed.with(stringSerde, intSerde);
        KStream stream = streamsBuilder.stream("data", with);
        stream.to(stringSerde, intSerde, "echo");
        KStream filter = stream.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.4
            public boolean test(String str2, Integer num) {
                return num == null || num.intValue() != Integer.MAX_VALUE;
            }
        });
        filter.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupByKey = filter.groupByKey(Serialized.with(stringSerde, intSerde));
        groupByKey.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.5
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m57apply() {
                return Integer.MAX_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.6
            public Integer apply(String str2, Integer num, Integer num2) {
                return num.intValue() < num2.intValue() ? num : num2;
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(1L)), intSerde, "uwin-min").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "min");
        KTable table = streamsBuilder.table("min", with);
        table.toStream().process(SmokeTestUtil.printProcessorSupplier("min"), new String[0]);
        groupByKey.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.7
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m58apply() {
                return Integer.MIN_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.8
            public Integer apply(String str2, Integer num, Integer num2) {
                return num.intValue() > num2.intValue() ? num : num2;
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), intSerde, "uwin-max").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "max");
        KTable table2 = streamsBuilder.table("max", with);
        table2.toStream().process(SmokeTestUtil.printProcessorSupplier("max"), new String[0]);
        groupByKey.aggregate(new Initializer<Long>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.9
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Long m59apply() {
                return 0L;
            }
        }, new Aggregator<String, Integer, Long>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.10
            public Long apply(String str2, Integer num, Long l) {
                return Long.valueOf(num.intValue() + l.longValue());
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), longSerde, "win-sum").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "sum");
        Consumed with2 = Consumed.with(stringSerde, longSerde);
        KTable table3 = streamsBuilder.table("sum", with2);
        table3.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"), new String[0]);
        groupByKey.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), "uwin-cnt").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "cnt");
        KTable table4 = streamsBuilder.table("cnt", with2);
        table4.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"), new String[0]);
        table2.join(table, new ValueJoiner<Integer, Integer, Integer>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.11
            public Integer apply(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() - num2.intValue());
            }
        }).to(stringSerde, intSerde, "dif");
        table3.join(table4, new ValueJoiner<Long, Long, Double>() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.12
            public Double apply(Long l, Long l2) {
                return Double.valueOf(l.longValue() / l2.longValue());
            }
        }).to(stringSerde, doubleSerde, "avg");
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        table4.groupBy(agg.selector(), Serialized.with(stringSerde, longSerde)).aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).to(stringSerde, longSerde, "tagg");
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.tests.SmokeTestClient.13
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + thread + ": " + th);
                kafkaStreams.close(30L, TimeUnit.SECONDS);
            }
        });
        return kafkaStreams;
    }
}
