package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsOptimizedTest.class */
public class StreamsOptimizedTest {
    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");
        }
        Properties loadProps = Utils.loadProps(strArr[0]);
        System.out.println("StreamsTest instance started StreamsOptimizedTest");
        System.out.println("props=" + loadProps);
        String str = (String) Objects.requireNonNull(loadProps.remove("input.topic"));
        String str2 = (String) Objects.requireNonNull(loadProps.remove("aggregation.topic"));
        String str3 = (String) Objects.requireNonNull(loadProps.remove("reduce.topic"));
        String str4 = (String) Objects.requireNonNull(loadProps.remove("join.topic"));
        Pattern compile = Pattern.compile("Sink: .*-repartition");
        Initializer initializer = () -> {
            return 0;
        };
        Aggregator aggregator = (str5, str6, num) -> {
            return Integer.valueOf(num.intValue() + str6.length());
        };
        Reducer reducer = (str7, str8) -> {
            return Integer.toString(Integer.parseInt(str7) + Integer.parseInt(str8));
        };
        Function function = str9 -> {
            return Integer.toString(Integer.parseInt(str9) % 9);
        };
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream(str, Consumed.with(Serdes.String(), Serdes.String())).selectKey((str10, str11) -> {
            return (String) function.apply(str11);
        });
        KStream stream = selectKey.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
        selectKey.groupByKey().aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer())).toStream().peek((str12, num2) -> {
            System.out.println(String.format("AGGREGATED key=%s value=%s", str12, num2));
        }).to(str2, Produced.with(Serdes.String(), Serdes.Integer()));
        selectKey.groupByKey().reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())).toStream().peek((str13, str14) -> {
            System.out.println(String.format("REDUCED key=%s value=%s", str13, str14));
        }).to(str3, Produced.with(Serdes.String(), Serdes.String()));
        selectKey.join(stream, (str15, l) -> {
            return str15 + ":" + l.toString();
        }, JoinWindows.of(Duration.ofMillis(500L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long())).peek((str16, str17) -> {
            System.out.println(String.format("JOINED key=%s value=%s", str16, str17));
        }).to(str4, Produced.with(Serdes.String(), Serdes.String()));
        Properties properties = new Properties();
        properties.setProperty("application.id", "StreamsOptimizedTest");
        properties.setProperty("cache.max.bytes.buffering", "0");
        properties.setProperty("default.key.serde", Serdes.String().getClass().getName());
        properties.setProperty("default.value.serde", Serdes.String().getClass().getName());
        properties.setProperty(StreamsConfig.adminClientPrefix("retries"), "100");
        properties.putAll(loadProps);
        Topology build = streamsBuilder.build(properties);
        KafkaStreams kafkaStreams = new KafkaStreams(build, properties);
        kafkaStreams.setStateListener((state, state2) -> {
            if (state2 == KafkaStreams.State.REBALANCING && state == KafkaStreams.State.RUNNING) {
                System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", Integer.valueOf(getCountOfRepartitionTopicsFound(build.describe().toString(), compile))));
                System.out.flush();
            }
        });
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        Exit.addShutdownHook("streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            kafkaStreams.close(Duration.ofMillis(5000L));
            System.out.println("OPTIMIZE_TEST Streams Stopped");
            System.out.flush();
        });
    }

    private static int getCountOfRepartitionTopicsFound(String str, Pattern pattern) {
        Matcher matcher = pattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            String group = matcher.group();
            System.out.println(String.format("REPARTITION TOPIC found -> %s", group));
            arrayList.add(group);
        }
        return arrayList.size();
    }
}
