package org.apache.kafka.streams.tests;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.class */
public class StreamsUpgradeToCooperativeRebalanceTest {
    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but none provided");
        }
        System.out.println("Args are " + Arrays.toString(strArr));
        Properties loadProps = Utils.loadProps(strArr[0]);
        Properties properties = new Properties();
        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v1.0)");
        System.out.println("props=" + loadProps);
        properties.put("application.id", "cooperative-rebalance-upgrade");
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        properties.put("commit.interval.ms", 1000L);
        properties.putAll(loadProps);
        String property = loadProps.getProperty("source.topic", "source");
        String property2 = loadProps.getProperty("sink.topic", "sink");
        String property3 = loadProps.getProperty("task.delimiter", "#");
        final int parseInt = Integer.parseInt(loadProps.getProperty("report.interval", "100"));
        final String property4 = loadProps.getProperty("upgrade.phase", "");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(property).peek(new ForeachAction<String, String>() { // from class: org.apache.kafka.streams.tests.StreamsUpgradeToCooperativeRebalanceTest.1
            int recordCounter = 0;

            public void apply(String str, String str2) {
                int i = this.recordCounter;
                this.recordCounter = i + 1;
                if (i % parseInt == 0) {
                    System.out.println(String.format("%sProcessed %d records so far", property4, Integer.valueOf(this.recordCounter)));
                    System.out.flush();
                }
            }
        }).to(property2);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                System.out.println(String.format("%sSTREAMS in a RUNNING State", property4));
                Set<ThreadMetadata> localThreadsMetadata = kafkaStreams.localThreadsMetadata();
                StringBuilder sb = new StringBuilder();
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (ThreadMetadata threadMetadata : localThreadsMetadata) {
                    getTasks(threadMetadata.activeTasks(), arrayList);
                    if (!threadMetadata.standbyTasks().isEmpty()) {
                        getTasks(threadMetadata.standbyTasks(), arrayList2);
                    }
                }
                addTasksToBuilder(arrayList, sb);
                sb.append(property3);
                if (!arrayList2.isEmpty()) {
                    addTasksToBuilder(arrayList2, sb);
                }
                System.out.println("TASK-ASSIGNMENTS:" + ((Object) sb));
            }
            if (state == KafkaStreams.State.REBALANCING) {
                System.out.println(String.format("%sStarting a REBALANCE", property4));
            }
        });
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", property4));
            System.out.flush();
        }));
    }

    private static void addTasksToBuilder(List<String> list, StringBuilder sb) {
        if (list.isEmpty()) {
            return;
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            sb.append(it.next()).append(",");
        }
        sb.setLength(sb.length() - 1);
    }

    private static void getTasks(Set<TaskMetadata> set, List<String> list) {
        Iterator<TaskMetadata> it = set.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().topicPartitions().iterator();
            while (it2.hasNext()) {
                list.add(((TopicPartition) it2.next()).toString());
            }
        }
    }
}
