/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class StreamsUpgradeToCooperativeRebalanceTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: " + (args.length > 0 ? args[0] : ""));
        }
        String zookeeper = args[0];
        String propFileName = args[1];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        Properties config = new Properties();
        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.0)");
        System.out.println("zookeeper=" + zookeeper);
        System.out.println("props=" + config);
        config.put("application.id", "cooperative-rebalance-upgrade");
        config.put("key.serde", Serdes.String().getClass());
        config.put("value.serde", Serdes.String().getClass());
        config.setProperty("zookeeper.connect", zookeeper);
        config.put("commit.interval.ms", (Object)1000L);
        config.putAll((Map<?, ?>)streamsProperties);
        String sourceTopic = config.getProperty("source.topic", "source");
        String sinkTopic = config.getProperty("sink.topic", "sink");
        final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
        final String upgradePhase = config.getProperty("upgrade.phase", "");
        KStreamBuilder builder = new KStreamBuilder();
        KStream upgradeStream = builder.stream(new String[]{sourceTopic});
        upgradeStream.foreach((ForeachAction)new ForeachAction<String, String>(){
            int recordCounter = 0;

            public void apply(String key, String value) {
                if (this.recordCounter++ % reportInterval == 0) {
                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, this.recordCounter));
                    System.out.flush();
                }
            }
        });
        upgradeStream.to(sinkTopic);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
            System.out.flush();
        }));
    }
}

