/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;

public class StreamsUpgradeToCooperativeRebalanceTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (kafka-url, properties-file) but none provided");
        }
        System.out.println("Args are " + Arrays.toString(args));
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        Properties config = new Properties();
        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.3)");
        System.out.println("props=" + streamsProperties);
        config.put("application.id", "cooperative-rebalance-upgrade");
        config.put("default.key.serde", Serdes.String().getClass());
        config.put("default.value.serde", Serdes.String().getClass());
        config.put("commit.interval.ms", (Object)1000L);
        config.putAll((Map<?, ?>)streamsProperties);
        String sourceTopic = streamsProperties.getProperty("source.topic", "source");
        String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
        String threadDelimiter = streamsProperties.getProperty("thread.delimiter", "&");
        String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(sourceTopic).peek((ForeachAction)new ForeachAction<String, String>(){
            int recordCounter = 0;

            public void apply(String key, String value) {
                if (this.recordCounter++ % reportInterval == 0) {
                    System.out.println(String.format("Processed %d records so far", this.recordCounter));
                    System.out.flush();
                }
            }
        }).to(sinkTopic);
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                System.out.println("STREAMS in a RUNNING State");
                Set allThreadMetadata = streams.localThreadsMetadata();
                StringBuilder taskReportBuilder = new StringBuilder();
                for (ThreadMetadata threadMetadata : allThreadMetadata) {
                    StreamsUpgradeToCooperativeRebalanceTest.buildTaskAssignmentReport(taskReportBuilder, threadMetadata.activeTasks(), "ACTIVE-TASKS:");
                    if (!threadMetadata.standbyTasks().isEmpty()) {
                        taskReportBuilder.append(taskDelimiter);
                        StreamsUpgradeToCooperativeRebalanceTest.buildTaskAssignmentReport(taskReportBuilder, threadMetadata.standbyTasks(), "STANDBY-TASKS:");
                    }
                    taskReportBuilder.append(threadDelimiter);
                }
                taskReportBuilder.setLength(taskReportBuilder.length() - 1);
                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
            }
            if (newState == KafkaStreams.State.REBALANCING) {
                System.out.println("Starting a REBALANCE");
            }
        });
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            System.out.println("COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED");
            System.out.flush();
        }));
    }

    private static void buildTaskAssignmentReport(StringBuilder taskReportBuilder, Set<TaskMetadata> taskMetadata, String taskType) {
        taskReportBuilder.append(taskType);
        for (TaskMetadata task : taskMetadata) {
            Set topicPartitions = task.topicPartitions();
            for (TopicPartition topicPartition : topicPartitions) {
                taskReportBuilder.append(topicPartition.toString()).append(",");
            }
        }
        taskReportBuilder.setLength(taskReportBuilder.length() - 1);
    }
}

