package org.apache.kafka.streams.tests;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.LegacySubscriptionInfoSerde;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest.class */
public class StreamsUpgradeTest {

    /* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest$FutureAssignmentInfo.class */
    private static class FutureAssignmentInfo extends AssignmentInfo {
        private final boolean bumpUsedVersion;
        private final boolean bumpSupportedVersion;
        final ByteBuffer originalUserMetadata;

        private FutureAssignmentInfo(boolean z, boolean z2, ByteBuffer byteBuffer) {
            super(7, 7);
            this.bumpUsedVersion = z;
            this.bumpSupportedVersion = z2;
            this.originalUserMetadata = byteBuffer;
        }

        public ByteBuffer encode() {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.originalUserMetadata.rewind();
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                Throwable th = null;
                try {
                    try {
                        if (this.bumpUsedVersion) {
                            this.originalUserMetadata.getInt();
                            dataOutputStream.writeInt(8);
                        } else {
                            dataOutputStream.writeInt(this.originalUserMetadata.getInt());
                        }
                        if (this.bumpSupportedVersion) {
                            this.originalUserMetadata.getInt();
                            dataOutputStream.writeInt(8);
                        }
                        while (true) {
                            try {
                                dataOutputStream.write(this.originalUserMetadata.get());
                            } catch (BufferUnderflowException e) {
                                dataOutputStream.flush();
                                dataOutputStream.close();
                                ByteBuffer wrap = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
                                if (dataOutputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            dataOutputStream.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        dataOutputStream.close();
                                    }
                                }
                                return wrap;
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e2) {
                throw new TaskAssignmentException("Failed to encode AssignmentInfo", e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest$FutureKafkaClientSupplier.class */
    public static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier {
        private FutureKafkaClientSupplier() {
        }

        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
            map.put("partition.assignment.strategy", FutureStreamsPartitionAssignor.class.getName());
            return new KafkaConsumer(map, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest$FutureStreamsPartitionAssignor.class */
    public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
        private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
        private AtomicInteger usedSubscriptionMetadataVersionPeek;
        private AtomicLong nextScheduledRebalanceMs;

        public FutureStreamsPartitionAssignor() {
            this.usedSubscriptionMetadataVersion = 8;
        }

        public void configure(Map<String, ?> map) {
            Object obj = map.get("test.future.metadata");
            if (obj instanceof AtomicInteger) {
                this.usedSubscriptionMetadataVersionPeek = (AtomicInteger) obj;
            } else {
                this.usedSubscriptionMetadataVersionPeek = new AtomicInteger();
            }
            map.remove("test.future.metadata");
            this.nextScheduledRebalanceMs = new AssignorConfiguration(map).nextScheduledRebalanceMs();
            super.configure(map);
        }

        public ByteBuffer subscriptionUserData(Set<String> set) {
            TaskManager taskManager = taskManager();
            handleRebalanceStart(set);
            return this.usedSubscriptionMetadataVersion <= 7 ? new SubscriptionInfo(this.usedSubscriptionMetadataVersion, 8, taskManager.processId(), userEndPoint(), taskManager.getTaskOffsetSums()).encode() : new FutureSubscriptionInfo(this.usedSubscriptionMetadataVersion, taskManager.processId(), SubscriptionInfo.getActiveTasksFromTaskOffsetSumMap(taskManager.getTaskOffsetSums()), SubscriptionInfo.getStandbyTasksFromTaskOffsetSumMap(taskManager.getTaskOffsetSums()), userEndPoint()).encode();
        }

        public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata consumerGroupMetadata) {
            try {
                super.onAssignment(assignment, consumerGroupMetadata);
                this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
            } catch (TaskAssignmentException e) {
                ByteBuffer userData = assignment.userData();
                userData.rewind();
                try {
                    DataInputStream dataInputStream = new DataInputStream(new ByteBufferInputStream(userData));
                    Throwable th = null;
                    try {
                        try {
                            int readInt = dataInputStream.readInt();
                            if (dataInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        dataInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    dataInputStream.close();
                                }
                            }
                            if (readInt > 8) {
                                throw new IllegalStateException("Unknown metadata version: " + readInt + "; latest supported version: 71");
                            }
                            AssignmentInfo decode = AssignmentInfo.decode(assignment.userData().putInt(0, 7));
                            if (maybeUpdateSubscriptionVersion(readInt, decode.commonlySupportedVersion())) {
                                this.log.info("Requested to schedule immediate rebalance due to version probing.");
                                this.nextScheduledRebalanceMs.set(0L);
                                this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
                            }
                            ArrayList arrayList = new ArrayList(assignment.partitions());
                            arrayList.sort(PARTITION_COMPARATOR);
                            taskManager().handleAssignment(getActiveTasks(arrayList, decode), decode.standbyTasks());
                            this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e2) {
                    throw new TaskAssignmentException("Failed to decode AssignmentInfo", e2);
                }
            }
        }

        public ConsumerPartitionAssignor.GroupAssignment assign(Cluster cluster, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
            boolean z;
            Map groupSubscription2 = groupSubscription.groupSubscription();
            HashSet hashSet = new HashSet();
            Iterator it = groupSubscription2.entrySet().iterator();
            while (it.hasNext()) {
                hashSet.add(Integer.valueOf(SubscriptionInfo.decode(((ConsumerPartitionAssignor.Subscription) ((Map.Entry) it.next()).getValue()).userData()).latestSupportedVersion()));
            }
            Map map = null;
            HashMap hashMap = new HashMap();
            Iterator it2 = groupSubscription2.values().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (SubscriptionInfo.decode(((ConsumerPartitionAssignor.Subscription) it2.next()).userData()).version() < 8) {
                    map = super.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(groupSubscription2)).groupAssignment();
                    break;
                }
            }
            boolean z2 = false;
            if (map != null) {
                z = hashSet.size() == 1 && ((Integer) hashSet.iterator().next()).intValue() == 8;
            } else {
                for (Map.Entry entry : groupSubscription2.entrySet()) {
                    ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription) entry.getValue();
                    SubscriptionInfo decode = SubscriptionInfo.decode(subscription.userData().putInt(0, 7).putInt(4, 7));
                    hashMap.put(entry.getKey(), new ConsumerPartitionAssignor.Subscription(subscription.topics(), new SubscriptionInfo(7, 7, decode.processId(), decode.userEndPoint(), taskManager().getTaskOffsetSums()).encode(), subscription.ownedPartitions()));
                }
                map = super.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(hashMap)).groupAssignment();
                z2 = true;
                z = true;
            }
            HashMap hashMap2 = new HashMap();
            for (Map.Entry entry2 : map.entrySet()) {
                ConsumerPartitionAssignor.Assignment assignment = (ConsumerPartitionAssignor.Assignment) entry2.getValue();
                hashMap2.put(entry2.getKey(), new ConsumerPartitionAssignor.Assignment(assignment.partitions(), new FutureAssignmentInfo(z2, z, assignment.userData()).encode()));
            }
            return new ConsumerPartitionAssignor.GroupAssignment(hashMap2);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest$FutureSubscriptionInfo.class */
    private static class FutureSubscriptionInfo {
        private final int version;
        private final UUID processId;
        private final Set<TaskId> activeTasks;
        private final Set<TaskId> standbyTasks;
        private final String userEndPoint;

        FutureSubscriptionInfo(int i, UUID uuid, Set<TaskId> set, Set<TaskId> set2, String str) {
            this.version = i;
            this.processId = uuid;
            this.activeTasks = set;
            this.standbyTasks = set2;
            this.userEndPoint = str;
            if (i <= 7) {
                throw new IllegalArgumentException("this class can't be used with version " + i);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ByteBuffer encode() {
            byte[] prepareUserEndPoint = LegacySubscriptionInfoSerde.prepareUserEndPoint(this.userEndPoint);
            ByteBuffer allocate = ByteBuffer.allocate(28 + (this.activeTasks.size() * 8) + 4 + (this.standbyTasks.size() * 8) + 4 + prepareUserEndPoint.length);
            allocate.putInt(this.version);
            allocate.putInt(this.version);
            LegacySubscriptionInfoSerde.encodeClientUUID(allocate, this.processId);
            LegacySubscriptionInfoSerde.encodeTasks(allocate, this.activeTasks);
            LegacySubscriptionInfoSerde.encodeTasks(allocate, this.standbyTasks);
            LegacySubscriptionInfoSerde.encodeUserEndPoint(allocate, prepareUserEndPoint);
            allocate.rewind();
            return allocate;
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: ");
        }
        Properties loadProps = Utils.loadProps(strArr.length > 0 ? strArr[0] : null);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
        System.out.println("props=" + loadProps);
        KafkaStreams buildStreams = buildStreams(loadProps);
        buildStreams.start();
        Exit.addShutdownHook("streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            buildStreams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        });
    }

    public static KafkaStreams buildStreams(Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("data");
        stream.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        stream.to("echo");
        Properties properties2 = new Properties();
        properties2.setProperty("application.id", "StreamsUpgradeTest");
        properties2.put("commit.interval.ms", 1000);
        KafkaClientSupplier futureKafkaClientSupplier = properties.containsKey("test.future.metadata") ? new FutureKafkaClientSupplier() : new DefaultKafkaClientSupplier();
        properties2.putAll(properties);
        return new KafkaStreams(streamsBuilder.build(), properties2, futureKafkaClientSupplier);
    }
}
