/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

public class StreamsNamedRepartitionTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
        }
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
        System.out.println("props=" + streamsProperties);
        String inputTopic = (String)Objects.requireNonNull(streamsProperties.remove("input.topic"));
        String aggregationTopic = (String)Objects.requireNonNull(streamsProperties.remove("aggregation.topic"));
        boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String)streamsProperties.remove("add.operations")));
        Initializer initializer = () -> 0;
        Aggregator aggregator = (k, v, agg) -> agg + Integer.parseInt(v);
        Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceStream = builder.stream(inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v)));
        KStream mappedStream = sourceStream.selectKey((k, v) -> (String)keyFunction.apply((String)v));
        KStream maybeUpdatedStream = addOperators ? mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1)) : mappedStream;
        maybeUpdatedStream.groupByKey(Grouped.with((String)"grouped-stream", (Serde)Serdes.String(), (Serde)Serdes.String())).aggregate(initializer, aggregator, Materialized.as((String)"count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream().peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))).to(aggregationTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsNamedRepartitionTest");
        config.setProperty("cache.max.bytes.buffering", "0");
        config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        config.setProperty("default.value.serde", Serdes.String().getClass().getName());
        config.putAll((Map<?, ?>)streamsProperties);
        Topology topology = builder.build(config);
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.setStateListener((newState, oldState) -> {
            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
                if (addOperators) {
                    System.out.println("UPDATED Topology");
                } else {
                    System.out.println("REBALANCING -> RUNNING");
                }
                System.out.flush();
            }
        });
        streams.start();
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            streams.close(Duration.ofMillis(5000L));
            System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
            System.out.flush();
        });
    }
}

