package org.apache.kafka.streams.tests;

import java.lang.Thread;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

/* loaded from: input_file:org/apache/kafka/streams/tests/EosTestClient.class */
public class EosTestClient extends SmokeTestUtil {
    static final String APP_ID = "EosTest";
    private final Properties properties;
    private final boolean withRepartitioning;
    private KafkaStreams streams;
    private boolean uncaughtException;
    private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
    private volatile boolean isRunning = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EosTestClient(Properties properties, boolean z) {
        this.properties = properties;
        this.withRepartitioning = z;
    }

    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: org.apache.kafka.streams.tests.EosTestClient.1
            @Override // java.lang.Runnable
            public void run() {
                EosTestClient.this.isRunning = false;
                EosTestClient.this.streams.close(Duration.ofSeconds(300L));
                EosTestClient.this.waitForStateTransitionCallback();
                if (EosTestClient.this.uncaughtException) {
                    return;
                }
                System.out.println(System.currentTimeMillis());
                System.out.println("EOS-TEST-CLIENT-CLOSED");
                System.out.flush();
            }
        }));
        while (this.isRunning) {
            if (this.streams == null) {
                this.uncaughtException = false;
                this.streams = createKafkaStreams(this.properties);
                this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.tests.EosTestClient.2
                    @Override // java.lang.Thread.UncaughtExceptionHandler
                    public void uncaughtException(Thread thread, Throwable th) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("EOS-TEST-CLIENT-EXCEPTION");
                        th.printStackTrace();
                        System.out.flush();
                        EosTestClient.this.uncaughtException = true;
                    }
                });
                this.streams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.tests.EosTestClient.3
                    public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("StateChange: " + state2 + " -> " + state);
                        System.out.flush();
                        if (state == KafkaStreams.State.NOT_RUNNING) {
                            EosTestClient.this.notRunningCallbackReceived.set(true);
                        }
                    }
                });
                this.streams.start();
            }
            if (this.uncaughtException) {
                this.streams.close(Duration.ofSeconds(IntegrationTestUtils.DEFAULT_TIMEOUT));
                this.streams = null;
            }
            sleep(1000L);
        }
    }

    private KafkaStreams createKafkaStreams(Properties properties) {
        properties.put("application.id", APP_ID);
        properties.put("num.stream.threads", 1);
        properties.put("num.standby.replicas", 2);
        properties.put("replication.factor", 3);
        properties.put("processing.guarantee", "exactly_once");
        properties.put("cache.max.bytes.buffering", 0);
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("data");
        stream.to("echo");
        stream.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.tests.EosTestClient.4
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m65apply() {
                return Integer.MAX_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>() { // from class: org.apache.kafka.streams.tests.EosTestClient.5
            public Integer apply(String str, Integer num, Integer num2) {
                return num.intValue() < num2.intValue() ? num : num2;
            }
        }, Materialized.with((Serde) null, intSerde)).toStream().to("min", Produced.with(stringSerde, intSerde));
        groupByKey.aggregate(new Initializer<Long>() { // from class: org.apache.kafka.streams.tests.EosTestClient.6
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Long m66apply() {
                return 0L;
            }
        }, new Aggregator<String, Integer, Long>() { // from class: org.apache.kafka.streams.tests.EosTestClient.7
            public Long apply(String str, Integer num, Long l) {
                return Long.valueOf(num.intValue() + l.longValue());
            }
        }, Materialized.with((Serde) null, longSerde)).toStream().to("sum", Produced.with(stringSerde, longSerde));
        if (this.withRepartitioning) {
            KStream through = stream.through("repartition");
            through.process(SmokeTestUtil.printProcessorSupplier("repartition"), new String[0]);
            KGroupedStream groupByKey2 = through.groupByKey();
            groupByKey2.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.tests.EosTestClient.8
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public Integer m67apply() {
                    return Integer.MIN_VALUE;
                }
            }, new Aggregator<String, Integer, Integer>() { // from class: org.apache.kafka.streams.tests.EosTestClient.9
                public Integer apply(String str, Integer num, Integer num2) {
                    return num.intValue() > num2.intValue() ? num : num2;
                }
            }, Materialized.with((Serde) null, intSerde)).toStream().to("max", Produced.with(stringSerde, intSerde));
            groupByKey2.count().toStream().to("cnt", Produced.with(stringSerde, longSerde));
        }
        return new KafkaStreams(streamsBuilder.build(), properties);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitForStateTransitionCallback() {
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300L);
        while (!this.notRunningCallbackReceived.get() && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
        }
        if (this.notRunningCallbackReceived.get()) {
            return;
        }
        System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
        System.err.flush();
    }
}
