package org.apache.kafka.coordinator.group;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupMetadataManager.class */
public class GroupMetadataManager {
    private final LogContext logContext;
    private final Logger log;
    private final SnapshotRegistry snapshotRegistry;
    private final Time time;
    private final CoordinatorTimer<Void, CoordinatorRecord> timer;
    private final GroupCoordinatorMetricsShard metrics;
    private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors;
    private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor;
    private final TimelineHashMap<String, Group> groups;
    private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;
    private final int consumerGroupMaxSize;
    private final int consumerGroupHeartbeatIntervalMs;
    private final int consumerGroupSessionTimeoutMs;
    private final int consumerGroupMetadataRefreshIntervalMs;
    private MetadataImage metadataImage;
    static final CoordinatorResult<Void, CoordinatorRecord> EMPTY_RESULT = new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null), false);
    private final int classicGroupMaxSize;
    private final int classicGroupInitialRebalanceDelayMs;
    private final int classicGroupNewMemberJoinTimeoutMs;
    private final int classicGroupMinSessionTimeoutMs;
    private final int classicGroupMaxSessionTimeoutMs;
    private final ShareGroupPartitionAssignor shareGroupAssignor;
    private final int shareGroupMaxSize;
    private final int shareGroupHeartbeatIntervalMs;
    private final int shareGroupSessionTimeoutMs;
    private final int shareGroupMetadataRefreshIntervalMs;
    private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.coordinator.group.GroupMetadataManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupMetadataManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_TOPIC_OR_PARTITION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_ENOUGH_REPLICAS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.REQUEST_TIMED_OUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_LEADER_OR_FOLLOWER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.KAFKA_STORAGE_ERROR.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.MESSAGE_TOO_LARGE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.RECORD_LIST_TOO_LARGE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.INVALID_FETCH_SIZE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState = new int[ClassicGroupState.values().length];
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState[ClassicGroupState.EMPTY.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState[ClassicGroupState.DEAD.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState[ClassicGroupState.PREPARING_REBALANCE.ordinal()] = 3;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState[ClassicGroupState.COMPLETING_REBALANCE.ordinal()] = 4;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$classic$ClassicGroupState[ClassicGroupState.STABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e13) {
            }
            $SwitchMap$org$apache$kafka$coordinator$group$Group$GroupType = new int[Group.GroupType.values().length];
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$Group$GroupType[Group.GroupType.CONSUMER.ordinal()] = 1;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$Group$GroupType[Group.GroupType.CLASSIC.ordinal()] = 2;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$Group$GroupType[Group.GroupType.SHARE.ordinal()] = 3;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupMetadataManager$Builder.class */
    public static class Builder {
        private int classicGroupMinSessionTimeoutMs;
        private int classicGroupMaxSessionTimeoutMs;
        private ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
        private GroupCoordinatorMetricsShard metrics;
        private LogContext logContext = null;
        private SnapshotRegistry snapshotRegistry = null;
        private Time time = null;
        private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
        private List<ConsumerGroupPartitionAssignor> consumerGroupAssignors = null;
        private int consumerGroupMaxSize = Integer.MAX_VALUE;
        private int consumerGroupHeartbeatIntervalMs = 5000;
        private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
        private MetadataImage metadataImage = null;
        private int consumerGroupSessionTimeoutMs = 45000;
        private int classicGroupMaxSize = Integer.MAX_VALUE;
        private int classicGroupInitialRebalanceDelayMs = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT;
        private int classicGroupNewMemberJoinTimeoutMs = GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS;
        private ShareGroupPartitionAssignor shareGroupAssignor = null;
        private int shareGroupMaxSize = Integer.MAX_VALUE;
        private int shareGroupHeartbeatIntervalMs = 5000;
        private int shareGroupSessionTimeoutMs = 45000;
        private int shareGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> coordinatorTimer) {
            this.timer = coordinatorTimer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupAssignors(List<ConsumerGroupPartitionAssignor> list) {
            this.consumerGroupAssignors = list;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupMaxSize(int i) {
            this.consumerGroupMaxSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupSessionTimeout(int i) {
            this.consumerGroupSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupHeartbeatInterval(int i) {
            this.consumerGroupHeartbeatIntervalMs = i;
            return this;
        }

        Builder withConsumerGroupMetadataRefreshIntervalMs(int i) {
            this.consumerGroupMetadataRefreshIntervalMs = i;
            return this;
        }

        Builder withMetadataImage(MetadataImage metadataImage) {
            this.metadataImage = metadataImage;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withClassicGroupMaxSize(int i) {
            this.classicGroupMaxSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withClassicGroupInitialRebalanceDelayMs(int i) {
            this.classicGroupInitialRebalanceDelayMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withClassicGroupNewMemberJoinTimeoutMs(int i) {
            this.classicGroupNewMemberJoinTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withClassicGroupMinSessionTimeoutMs(int i) {
            this.classicGroupMinSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withClassicGroupMaxSessionTimeoutMs(int i) {
            this.classicGroupMaxSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) {
            this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard groupCoordinatorMetricsShard) {
            this.metrics = groupCoordinatorMetricsShard;
            return this;
        }

        Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupPartitionAssignor) {
            this.shareGroupAssignor = shareGroupPartitionAssignor;
            return this;
        }

        public Builder withShareGroupMaxSize(int i) {
            this.shareGroupMaxSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withShareGroupSessionTimeout(int i) {
            this.shareGroupSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withShareGroupHeartbeatInterval(int i) {
            this.shareGroupHeartbeatIntervalMs = i;
            return this;
        }

        Builder withShareGroupMetadataRefreshIntervalMs(int i) {
            this.shareGroupMetadataRefreshIntervalMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public GroupMetadataManager build() {
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = new SnapshotRegistry(this.logContext);
            }
            if (this.metadataImage == null) {
                this.metadataImage = MetadataImage.EMPTY;
            }
            if (this.time == null) {
                this.time = Time.SYSTEM;
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.consumerGroupAssignors == null || this.consumerGroupAssignors.isEmpty()) {
                throw new IllegalArgumentException("Assignors must be set before building.");
            }
            if (this.shareGroupAssignor == null) {
                this.shareGroupAssignor = new SimpleAssignor();
            }
            if (this.metrics == null) {
                throw new IllegalArgumentException("GroupCoordinatorMetricsShard must be set.");
            }
            return new GroupMetadataManager(this.snapshotRegistry, this.logContext, this.time, this.timer, this.metrics, this.consumerGroupAssignors, this.metadataImage, this.consumerGroupMaxSize, this.consumerGroupSessionTimeoutMs, this.consumerGroupHeartbeatIntervalMs, this.consumerGroupMetadataRefreshIntervalMs, this.classicGroupMaxSize, this.classicGroupInitialRebalanceDelayMs, this.classicGroupNewMemberJoinTimeoutMs, this.classicGroupMinSessionTimeoutMs, this.classicGroupMaxSessionTimeoutMs, this.consumerGroupMigrationPolicy, this.shareGroupAssignor, this.shareGroupMaxSize, this.shareGroupSessionTimeoutMs, this.shareGroupHeartbeatIntervalMs, this.shareGroupMetadataRefreshIntervalMs, null);
        }
    }

    private GroupMetadataManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, CoordinatorTimer<Void, CoordinatorRecord> coordinatorTimer, GroupCoordinatorMetricsShard groupCoordinatorMetricsShard, List<ConsumerGroupPartitionAssignor> list, MetadataImage metadataImage, int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9, ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy, ShareGroupPartitionAssignor shareGroupPartitionAssignor, int i10, int i11, int i12, int i13) {
        this.logContext = logContext;
        this.log = logContext.logger(GroupMetadataManager.class);
        this.snapshotRegistry = snapshotRegistry;
        this.time = time;
        this.timer = coordinatorTimer;
        this.metrics = groupCoordinatorMetricsShard;
        this.metadataImage = metadataImage;
        this.consumerGroupAssignors = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity()));
        this.defaultConsumerGroupAssignor = list.get(0);
        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
        this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
        this.consumerGroupMaxSize = i;
        this.consumerGroupSessionTimeoutMs = i2;
        this.consumerGroupHeartbeatIntervalMs = i3;
        this.consumerGroupMetadataRefreshIntervalMs = i4;
        this.classicGroupMaxSize = i5;
        this.classicGroupInitialRebalanceDelayMs = i6;
        this.classicGroupNewMemberJoinTimeoutMs = i7;
        this.classicGroupMinSessionTimeoutMs = i8;
        this.classicGroupMaxSessionTimeoutMs = i9;
        this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
        this.shareGroupAssignor = shareGroupPartitionAssignor;
        this.shareGroupMaxSize = i10;
        this.shareGroupSessionTimeoutMs = i11;
        this.shareGroupHeartbeatIntervalMs = i12;
        this.shareGroupMetadataRefreshIntervalMs = i13;
    }

    public MetadataImage image() {
        return this.metadataImage;
    }

    public Group group(String str) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str, Long.MAX_VALUE);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Group %s not found.", str));
        }
        return group;
    }

    public Group group(String str, long j) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str, j);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Group %s not found.", str));
        }
        return group;
    }

    public List<ListGroupsResponseData.ListedGroup> listGroups(Set<String> set, Set<String> set2, long j) {
        Set set3 = (Set) set.stream().map((v0) -> {
            return v0.toLowerCase();
        }).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toSet());
        Set set4 = (Set) set2.stream().map(Group.GroupType::parse).collect(Collectors.toSet());
        return (List) this.groups.values(j).stream().filter(group -> {
            return (set.isEmpty() || group.isInStates(set3, j)) && (set4.isEmpty() || set4.contains(group.type()));
        }).map(group2 -> {
            return group2.asListedGroup(j);
        }).collect(Collectors.toList());
    }

    public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe(List<String> list, long j) {
        ArrayList arrayList = new ArrayList();
        list.forEach(str -> {
            try {
                arrayList.add(consumerGroup(str, j).asDescribedGroup(j, this.defaultConsumerGroupAssignor.name(), this.metadataImage.topics()));
            } catch (GroupIdNotFoundException e) {
                arrayList.add(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId(str).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()));
            }
        });
        return arrayList;
    }

    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> list, long j) {
        ArrayList arrayList = new ArrayList();
        list.forEach(str -> {
            try {
                ClassicGroup classicGroup = classicGroup(str, j);
                if (!classicGroup.isInState(ClassicGroupState.STABLE)) {
                    arrayList.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(str).setGroupState(classicGroup.stateAsString()).setProtocolType(classicGroup.protocolType().orElse(ClassicGroup.NO_LEADER)).setMembers((List) classicGroup.allMembers().stream().map(classicGroupMember -> {
                        return classicGroupMember.describeNoMetadata();
                    }).collect(Collectors.toList())));
                } else {
                    if (!classicGroup.protocolName().isPresent()) {
                        throw new IllegalStateException("Invalid null group protocol for stable group");
                    }
                    arrayList.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(str).setGroupState(classicGroup.stateAsString()).setProtocolType(classicGroup.protocolType().orElse(ClassicGroup.NO_LEADER)).setProtocolData(classicGroup.protocolName().get()).setMembers((List) classicGroup.allMembers().stream().map(classicGroupMember2 -> {
                        return classicGroupMember2.describe(classicGroup.protocolName().get());
                    }).collect(Collectors.toList())));
                }
            } catch (GroupIdNotFoundException e) {
                arrayList.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(str).setGroupState(ClassicGroupState.DEAD.toString()));
            }
        });
        return arrayList;
    }

    ConsumerGroup getOrMaybeCreateConsumerGroup(String str, boolean z, List<CoordinatorRecord> list) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str);
        if (group == null && !z) {
            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", str));
        }
        if (group == null || (z && maybeDeleteEmptyClassicGroup(group, list))) {
            return new ConsumerGroup(this.snapshotRegistry, str, this.metrics);
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup) group;
        }
        if (z && validateOnlineUpgrade((ClassicGroup) group)) {
            return convertToConsumerGroup((ClassicGroup) group, list);
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", str));
    }

    public ConsumerGroup consumerGroup(String str, long j) throws GroupIdNotFoundException {
        Group group = group(str, j);
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup) group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", str));
    }

    ConsumerGroup consumerGroup(String str) throws GroupIdNotFoundException {
        return consumerGroup(str, Long.MAX_VALUE);
    }

    ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(String str, boolean z) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str);
        if (group == null && !z) {
            throw new IllegalStateException(String.format("Consumer group %s not found.", str));
        }
        if (group != null) {
            if (group.type() == Group.GroupType.CONSUMER) {
                return (ConsumerGroup) group;
            }
            throw new IllegalStateException(String.format("Group %s is not a consumer group.", str));
        }
        ConsumerGroup consumerGroup = new ConsumerGroup(this.snapshotRegistry, str, this.metrics);
        this.groups.put(str, consumerGroup);
        this.metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
        return consumerGroup;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClassicGroup getOrMaybeCreateClassicGroup(String str, boolean z) throws UnknownMemberIdException, GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str);
        if (group == null && !z) {
            throw new UnknownMemberIdException(String.format("Classic group %s not found.", str));
        }
        if (group != null) {
            if (group.type() == Group.GroupType.CLASSIC) {
                return (ClassicGroup) group;
            }
            throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", str));
        }
        ClassicGroup classicGroup = new ClassicGroup(this.logContext, str, ClassicGroupState.EMPTY, this.time, this.metrics);
        this.groups.put(str, classicGroup);
        this.metrics.onClassicGroupStateTransition(null, classicGroup.currentState());
        return classicGroup;
    }

    public ClassicGroup classicGroup(String str, long j) throws GroupIdNotFoundException {
        Group group = group(str, j);
        if (group.type() == Group.GroupType.CLASSIC) {
            return (ClassicGroup) group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", str));
    }

    private ShareGroup getOrMaybeCreateShareGroup(String str, boolean z) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str);
        if (group == null && !z) {
            throw new GroupIdNotFoundException(String.format("Share group %s not found.", str));
        }
        if (group == null) {
            return new ShareGroup(this.snapshotRegistry, str);
        }
        if (group.type() != Group.GroupType.SHARE) {
            throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", str));
        }
        return (ShareGroup) group;
    }

    ShareGroup getOrMaybeCreatePersistedShareGroup(String str, boolean z) throws GroupIdNotFoundException {
        Group group = (Group) this.groups.get(str);
        if (group == null && !z) {
            throw new IllegalStateException(String.format("Share group %s not found.", str));
        }
        if (group == null) {
            ShareGroup shareGroup = new ShareGroup(this.snapshotRegistry, str);
            this.groups.put(str, shareGroup);
            return shareGroup;
        }
        if (group.type() != Group.GroupType.SHARE) {
            throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", str));
        }
        return (ShareGroup) group;
    }

    public ShareGroup shareGroup(String str, long j) throws GroupIdNotFoundException {
        Group group = group(str, j);
        if (group.type() == Group.GroupType.SHARE) {
            return (ShareGroup) group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", str));
    }

    ShareGroup shareGroup(String str) throws GroupIdNotFoundException {
        return shareGroup(str, Long.MAX_VALUE);
    }

    ModernGroup<?> getOrMaybeCreatePersistedGroup(String str, boolean z, Group.GroupType groupType) throws GroupIdNotFoundException {
        if (groupType == Group.GroupType.CONSUMER) {
            return getOrMaybeCreatePersistedConsumerGroup(str, z);
        }
        if (groupType == Group.GroupType.SHARE) {
            return getOrMaybeCreatePersistedShareGroup(str, z);
        }
        throw new IllegalArgumentException("Invalid group type: " + groupType);
    }

    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String str) {
        if (!consumerGroup.allMembersUseClassicProtocolExcept(str)) {
            return false;
        }
        if (consumerGroup.numMembers() <= 1) {
            this.log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId());
            return false;
        }
        if (!this.consumerGroupMigrationPolicy.isDowngradeEnabled()) {
            this.log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId());
            return false;
        }
        if (consumerGroup.numMembers() - 1 <= this.classicGroupMaxSize) {
            return true;
        }
        this.log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId());
        return false;
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> convertToClassicGroup(ConsumerGroup consumerGroup, String str, T t) {
        ArrayList arrayList = new ArrayList();
        consumerGroup.createGroupTombstoneRecords(arrayList);
        try {
            ClassicGroup fromConsumerGroup = ClassicGroup.fromConsumerGroup(consumerGroup, str, this.logContext, this.time, this.metrics, this.metadataImage);
            fromConsumerGroup.createClassicGroupRecords(this.metadataImage.features().metadataVersion(), arrayList);
            removeGroup(consumerGroup.groupId());
            this.groups.put(consumerGroup.groupId(), fromConsumerGroup);
            this.metrics.onClassicGroupStateTransition(null, fromConsumerGroup.currentState());
            fromConsumerGroup.allMembers().forEach(classicGroupMember -> {
                rescheduleClassicGroupMemberHeartbeat(fromConsumerGroup, classicGroupMember);
            });
            prepareRebalance(fromConsumerGroup, String.format("Downgrade group %s from consumer to classic.", fromConsumerGroup.groupId()));
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.exceptionally((Function) th -> {
                this.metrics.onClassicGroupStateTransition(fromConsumerGroup.currentState(), null);
                return null;
            });
            return new CoordinatorResult<>(arrayList, t, completableFuture, false);
        } catch (SchemaException e) {
            this.log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse the Consumer Protocol consumer.", e);
            throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", consumerGroup.groupId(), e.getMessage()));
        }
    }

    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
        if (!this.consumerGroupMigrationPolicy.isUpgradeEnabled()) {
            this.log.info("Cannot upgrade classic group {} to consumer group because the online upgrade is disabled.", classicGroup.groupId());
            return false;
        }
        if (!classicGroup.usesConsumerGroupProtocol()) {
            this.log.info("Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.", classicGroup.groupId());
            return false;
        }
        if (classicGroup.numMembers() <= this.consumerGroupMaxSize) {
            return true;
        }
        this.log.info("Cannot upgrade classic group {} to consumer group because the group size exceeds the consumer group maximum size.", classicGroup.groupId());
        return false;
    }

    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<CoordinatorRecord> list) {
        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
        classicGroup.createGroupTombstoneRecords(list);
        try {
            ConsumerGroup fromClassicGroup = ConsumerGroup.fromClassicGroup(this.snapshotRegistry, this.metrics, classicGroup, this.metadataImage.topics());
            fromClassicGroup.createConsumerGroupRecords(list);
            fromClassicGroup.members().forEach((str, consumerGroupMember) -> {
                scheduleConsumerGroupSessionTimeout(fromClassicGroup.groupId(), str, consumerGroupMember.classicProtocolSessionTimeout().get().intValue());
            });
            return fromClassicGroup;
        } catch (SchemaException e) {
            this.log.warn("Cannot upgrade the classic group " + classicGroup.groupId() + " to consumer group because the embedded consumer protocol is malformed: " + e.getMessage() + ".", e);
            throw new GroupIdNotFoundException("Cannot upgrade the classic group " + classicGroup.groupId() + " to consumer group because the embedded consumer protocol is malformed.");
        }
    }

    private void removeGroup(String str) {
        Group group = (Group) this.groups.remove(str);
        if (group != null) {
            switch (group.type()) {
                case CONSUMER:
                    this.metrics.onConsumerGroupStateTransition(((ConsumerGroup) group).state(), null);
                    return;
                case CLASSIC:
                    this.metrics.onClassicGroupStateTransition(((ClassicGroup) group).currentState(), null);
                    return;
                case SHARE:
                    return;
                default:
                    this.log.warn("Removed group {} with an unknown group type {}.", str, group.type());
                    return;
            }
        }
    }

    private void throwIfEmptyString(String str, String str2) throws InvalidRequestException {
        if (str != null && str.trim().isEmpty()) {
            throw new InvalidRequestException(str2);
        }
    }

    private void throwIfNotNull(Object obj, String str) throws InvalidRequestException {
        if (obj != null) {
            throw new InvalidRequestException(str);
        }
    }

    private void throwIfNull(Object obj, String str) throws InvalidRequestException {
        if (obj == null) {
            throw new InvalidRequestException(str);
        }
    }

    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) throws InvalidRequestException, UnsupportedAssignorException {
        throwIfEmptyString(consumerGroupHeartbeatRequestData.groupId(), "GroupId can't be empty.");
        throwIfEmptyString(consumerGroupHeartbeatRequestData.instanceId(), "InstanceId can't be empty.");
        throwIfEmptyString(consumerGroupHeartbeatRequestData.rackId(), "RackId can't be empty.");
        if (consumerGroupHeartbeatRequestData.memberEpoch() > 0 || consumerGroupHeartbeatRequestData.memberEpoch() == -1) {
            throwIfEmptyString(consumerGroupHeartbeatRequestData.memberId(), "MemberId can't be empty.");
        } else if (consumerGroupHeartbeatRequestData.memberEpoch() == 0) {
            if (consumerGroupHeartbeatRequestData.rebalanceTimeoutMs() == -1) {
                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
            }
            if (consumerGroupHeartbeatRequestData.topicPartitions() == null || !consumerGroupHeartbeatRequestData.topicPartitions().isEmpty()) {
                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
            }
            if (consumerGroupHeartbeatRequestData.subscribedTopicNames() == null || consumerGroupHeartbeatRequestData.subscribedTopicNames().isEmpty()) {
                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
            }
        } else {
            if (consumerGroupHeartbeatRequestData.memberEpoch() != -2) {
                throw new InvalidRequestException("MemberEpoch is invalid.");
            }
            throwIfEmptyString(consumerGroupHeartbeatRequestData.memberId(), "MemberId can't be empty.");
            throwIfNull(consumerGroupHeartbeatRequestData.instanceId(), "InstanceId can't be null.");
        }
        if (consumerGroupHeartbeatRequestData.serverAssignor() != null && !this.consumerGroupAssignors.containsKey(consumerGroupHeartbeatRequestData.serverAssignor())) {
            throw new UnsupportedAssignorException("ServerAssignor " + consumerGroupHeartbeatRequestData.serverAssignor() + " is not supported. Supported assignors: " + String.join(", ", this.consumerGroupAssignors.keySet()) + ".");
        }
    }

    private void throwIfShareGroupHeartbeatRequestIsInvalid(ShareGroupHeartbeatRequestData shareGroupHeartbeatRequestData) throws InvalidRequestException, UnsupportedAssignorException {
        throwIfEmptyString(shareGroupHeartbeatRequestData.groupId(), "GroupId can't be empty.");
        throwIfEmptyString(shareGroupHeartbeatRequestData.rackId(), "RackId can't be empty.");
        if (shareGroupHeartbeatRequestData.memberEpoch() > 0 || shareGroupHeartbeatRequestData.memberEpoch() == -1) {
            throwIfEmptyString(shareGroupHeartbeatRequestData.memberId(), "MemberId can't be empty.");
        } else {
            if (shareGroupHeartbeatRequestData.memberEpoch() != 0) {
                throw new InvalidRequestException("MemberEpoch is invalid.");
            }
            if (shareGroupHeartbeatRequestData.subscribedTopicNames() == null || shareGroupHeartbeatRequestData.subscribedTopicNames().isEmpty()) {
                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
            }
        }
    }

    private boolean isSubset(List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list, Map<Uuid, Set<Integer>> map) {
        if (list == null) {
            return false;
        }
        for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : list) {
            Set<Integer> set = map.get(topicPartitions.topicId());
            if (set == null) {
                return false;
            }
            Iterator it = topicPartitions.partitions().iterator();
            while (it.hasNext()) {
                if (!set.contains((Integer) it.next())) {
                    return false;
                }
            }
        }
        return true;
    }

    private void throwIfConsumerGroupIsFull(ConsumerGroup consumerGroup, String str) throws GroupMaxSizeReachedException {
        if (consumerGroup.numMembers() >= this.consumerGroupMaxSize) {
            if (str.isEmpty() || !consumerGroup.hasMember(str)) {
                throw new GroupMaxSizeReachedException("The consumer group has reached its maximum capacity of " + this.consumerGroupMaxSize + " members.");
            }
        }
    }

    private void throwIfShareGroupIsFull(ShareGroup shareGroup, String str) throws GroupMaxSizeReachedException {
        if (shareGroup.numMembers() >= this.shareGroupMaxSize) {
            if (str.isEmpty() || !shareGroup.hasMember(str)) {
                throw new GroupMaxSizeReachedException("The share group has reached its maximum capacity of " + this.shareGroupMaxSize + " members.");
            }
        }
    }

    private void throwIfConsumerGroupMemberEpochIsInvalid(ConsumerGroupMember consumerGroupMember, int i, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list) {
        if (i > consumerGroupMember.memberEpoch()) {
            throw new FencedMemberEpochException("The consumer group member has a greater member epoch (" + i + ") than the one known by the group coordinator (" + consumerGroupMember.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
        if (i < consumerGroupMember.memberEpoch()) {
            if (i != consumerGroupMember.previousMemberEpoch() || !isSubset(list, consumerGroupMember.assignedPartitions())) {
                throw new FencedMemberEpochException("The consumer group member has a smaller member epoch (" + i + ") than the one known by the group coordinator (" + consumerGroupMember.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
            }
        }
    }

    private void throwIfShareGroupMemberEpochIsInvalid(ShareGroupMember shareGroupMember, int i) {
        if (i > shareGroupMember.memberEpoch()) {
            throw new FencedMemberEpochException("The share group member has a greater member epoch (" + i + ") than the one known by the group coordinator (" + shareGroupMember.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
    }

    private void throwIfInstanceIdIsUnreleased(ConsumerGroupMember consumerGroupMember, String str, String str2, String str3) {
        if (consumerGroupMember.memberEpoch() != -2) {
            this.log.info("[GroupId {}] Static member {} with instance id {} cannot join the group because the instance id is is owned by member {}.", new Object[]{str, str2, str3, consumerGroupMember.memberId()});
            throw Errors.UNRELEASED_INSTANCE_ID.exception("Static member " + str2 + " with instance id " + str3 + " cannot join the group because the instance id is owned by " + consumerGroupMember.memberId() + " member.");
        }
    }

    private void throwIfInstanceIdIsFenced(ConsumerGroupMember consumerGroupMember, String str, String str2, String str3) {
        if (consumerGroupMember.memberId().equals(str2)) {
            return;
        }
        this.log.info("[GroupId {}] Static member {} with instance id {} is fenced by existing member {}.", new Object[]{str, str2, str3, consumerGroupMember.memberId()});
        throw Errors.FENCED_INSTANCE_ID.exception("Static member " + str2 + " with instance id " + str3 + " was fenced by member " + consumerGroupMember.memberId() + ".");
    }

    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember consumerGroupMember, String str) {
        if (consumerGroupMember == null) {
            throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " + str + " is unknown.");
        }
    }

    private void throwIfClassicProtocolIsNotSupported(ConsumerGroup consumerGroup, String str, String str2, JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection) {
        if (!consumerGroup.supportsClassicProtocols(str2, ClassicGroupMember.plainProtocolSet(joinGroupRequestProtocolCollection))) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + str + "'s protocols are not supported.");
        }
    }

    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember consumerGroupMember) {
        if (!consumerGroupMember.useClassicProtocol()) {
            throw new UnknownMemberIdException(String.format("Member %s does not use the classic protocol.", consumerGroupMember.memberId()));
        }
    }

    private void throwIfGenerationIdUnmatched(String str, int i, int i2) {
        if (i != i2) {
            throw Errors.ILLEGAL_GENERATION.exception(String.format("The request generation id %s is not equal to the member epoch %d of member %s.", Integer.valueOf(i2), Integer.valueOf(i), str));
        }
    }

    private void throwIfClassicProtocolUnmatched(ConsumerGroupMember consumerGroupMember, String str, String str2) {
        String name = consumerGroupMember.supportedClassicProtocols().get().iterator().next().name();
        if (str != null && !"consumer".equals(str)) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", str, consumerGroupMember.memberId(), "consumer"));
        }
        if (str2 != null && !name.equals(str2)) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(String.format("The protocol name %s from member %s request is not equal to the protocol name %s returned in the join response.", str2, consumerGroupMember.memberId(), name));
        }
    }

    private void throwIfRebalanceInProgress(ConsumerGroup consumerGroup, ConsumerGroupMember consumerGroupMember) {
        if (consumerGroup.groupEpoch() <= consumerGroupMember.memberEpoch() || consumerGroupMember.state().equals(MemberState.UNREVOKED_PARTITIONS)) {
            return;
        }
        scheduleConsumerGroupJoinTimeoutIfAbsent(consumerGroup.groupId(), consumerGroupMember.memberId(), consumerGroupMember.rebalanceTimeoutMs());
        throw Errors.REBALANCE_IN_PROGRESS.exception(String.format("A new rebalance is triggered in group %s and member %s should rejoin to catch up.", consumerGroup.groupId(), consumerGroupMember.memberId()));
    }

    private static ConsumerProtocolSubscription deserializeSubscription(JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection) {
        try {
            return ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) joinGroupRequestProtocolCollection.iterator().next()).metadata()));
        } catch (SchemaException e) {
            throw new IllegalStateException("Malformed embedded consumer protocol in subscription deserialization.");
        }
    }

    private ConsumerGroupHeartbeatResponseData.Assignment createConsumerGroupResponseAssignment(ConsumerGroupMember consumerGroupMember) {
        return new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(fromAssignmentMap(consumerGroupMember.assignedPartitions()));
    }

    private ShareGroupHeartbeatResponseData.Assignment createShareGroupResponseAssignment(ShareGroupMember shareGroupMember) {
        return new ShareGroupHeartbeatResponseData.Assignment().setTopicPartitions(fromShareGroupAssignmentMap(shareGroupMember.assignedPartitions()));
    }

    private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromAssignmentMap(Map<Uuid, Set<Integer>> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId((Uuid) entry.getKey()).setPartitions(new ArrayList((Collection) entry.getValue()));
        }).collect(Collectors.toList());
    }

    private List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssignmentMap(Map<Uuid, Set<Integer>> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return new ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId((Uuid) entry.getKey()).setPartitions(new ArrayList((Collection) entry.getValue()));
        }).collect(Collectors.toList());
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(String str, String str2, int i, String str3, String str4, int i2, String str5, String str6, List<String> list, String str7, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list2) throws ApiException {
        int assignmentEpoch;
        Assignment targetAssignment;
        long milliseconds = this.time.milliseconds();
        List<CoordinatorRecord> arrayList = new ArrayList<>();
        boolean z = i == 0;
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(str, z, arrayList);
        throwIfConsumerGroupIsFull(orMaybeCreateConsumerGroup, str2);
        if (str2.isEmpty()) {
            str2 = Uuid.randomUuid().toString();
        }
        ConsumerGroupMember orMaybeSubscribeDynamicConsumerGroupMember = str3 == null ? getOrMaybeSubscribeDynamicConsumerGroupMember(orMaybeCreateConsumerGroup, str2, i, list2, z, false) : getOrMaybeSubscribeStaticConsumerGroupMember(orMaybeCreateConsumerGroup, str2, i, str3, list2, z, false, arrayList);
        ConsumerGroupMember build = new ConsumerGroupMember.Builder(orMaybeSubscribeDynamicConsumerGroupMember).maybeUpdateInstanceId(Optional.ofNullable(str3)).maybeUpdateRackId(Optional.ofNullable(str4)).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(i2)).maybeUpdateServerAssignorName(Optional.ofNullable(str7)).maybeUpdateSubscribedTopicNames(Optional.ofNullable(list)).setClientId(str5).setClientHost(str6).setClassicMemberMetadata(null).build();
        boolean hasMemberSubscriptionChanged = hasMemberSubscriptionChanged(str, orMaybeSubscribeDynamicConsumerGroupMember, build, arrayList);
        int groupEpoch = orMaybeCreateConsumerGroup.groupEpoch();
        Map<String, TopicMetadata> subscriptionMetadata = orMaybeCreateConsumerGroup.subscriptionMetadata();
        orMaybeCreateConsumerGroup.subscribedTopicNames();
        SubscriptionType subscriptionType = orMaybeCreateConsumerGroup.subscriptionType();
        if (hasMemberSubscriptionChanged || orMaybeCreateConsumerGroup.hasMetadataExpired(milliseconds)) {
            Map<String, Integer> computeSubscribedTopicNames = orMaybeCreateConsumerGroup.computeSubscribedTopicNames(orMaybeSubscribeDynamicConsumerGroupMember, build);
            subscriptionMetadata = orMaybeCreateConsumerGroup.computeSubscriptionMetadata(computeSubscribedTopicNames, this.metadataImage.topics(), this.metadataImage.cluster());
            int numMembers = orMaybeCreateConsumerGroup.numMembers();
            if (!orMaybeCreateConsumerGroup.hasMember(build.memberId()) && !orMaybeCreateConsumerGroup.hasStaticMember(build.instanceId())) {
                numMembers++;
            }
            subscriptionType = ConsumerGroup.subscriptionType(computeSubscribedTopicNames, numMembers);
            if (!subscriptionMetadata.equals(orMaybeCreateConsumerGroup.subscriptionMetadata())) {
                this.log.info("[GroupId {}] Computed new subscription metadata: {}.", str, subscriptionMetadata);
                hasMemberSubscriptionChanged = true;
                arrayList.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(str, subscriptionMetadata));
            }
            if (hasMemberSubscriptionChanged) {
                groupEpoch++;
                arrayList.add(CoordinatorRecordHelpers.newGroupEpochRecord(str, groupEpoch));
                this.log.info("[GroupId {}] Bumped group epoch to {}.", str, Integer.valueOf(groupEpoch));
                this.metrics.record(GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
            }
            orMaybeCreateConsumerGroup.setMetadataRefreshDeadline(milliseconds + this.consumerGroupMetadataRefreshIntervalMs, groupEpoch);
        }
        if (groupEpoch > orMaybeCreateConsumerGroup.assignmentEpoch()) {
            targetAssignment = updateTargetAssignment(orMaybeCreateConsumerGroup, groupEpoch, orMaybeSubscribeDynamicConsumerGroupMember, build, subscriptionMetadata, subscriptionType, arrayList);
            assignmentEpoch = groupEpoch;
        } else {
            assignmentEpoch = orMaybeCreateConsumerGroup.assignmentEpoch();
            targetAssignment = orMaybeCreateConsumerGroup.targetAssignment(build.memberId(), build.instanceId());
        }
        orMaybeCreateConsumerGroup.getClass();
        ConsumerGroupMember maybeReconcile = maybeReconcile(str, build, (v1, v2) -> {
            return r3.currentPartitionEpoch(v1, v2);
        }, assignmentEpoch, targetAssignment, list2, arrayList);
        scheduleConsumerGroupSessionTimeout(str, str2);
        ConsumerGroupHeartbeatResponseData heartbeatIntervalMs = new ConsumerGroupHeartbeatResponseData().setMemberId(maybeReconcile.memberId()).setMemberEpoch(maybeReconcile.memberEpoch()).setHeartbeatIntervalMs(this.consumerGroupHeartbeatIntervalMs);
        if ((i == 0 || !(i2 == -1 || list == null || list2 == null)) || ConsumerGroupMember.hasAssignedPartitionsChanged(orMaybeSubscribeDynamicConsumerGroupMember, maybeReconcile)) {
            heartbeatIntervalMs.setAssignment(createConsumerGroupResponseAssignment(maybeReconcile));
        }
        return new CoordinatorResult<>(arrayList, heartbeatIntervalMs);
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGroup(ConsumerGroup consumerGroup, RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, CompletableFuture<JoinGroupResponseData> completableFuture) throws ApiException {
        int assignmentEpoch;
        Assignment targetAssignment;
        long milliseconds = this.time.milliseconds();
        ArrayList arrayList = new ArrayList();
        String groupId = joinGroupRequestData.groupId();
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        int sessionTimeoutMs = joinGroupRequestData.sessionTimeoutMs();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = joinGroupRequestData.protocols();
        String memberId = joinGroupRequestData.memberId();
        boolean equals = memberId.equals(ClassicGroup.NO_LEADER);
        if (equals) {
            memberId = Uuid.randomUuid().toString();
        }
        throwIfConsumerGroupIsFull(consumerGroup, memberId);
        throwIfClassicProtocolIsNotSupported(consumerGroup, memberId, joinGroupRequestData.protocolType(), protocols);
        if (JoinGroupRequest.requiresKnownMemberId(joinGroupRequestData, requestContext.apiVersion())) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.MEMBER_ID_REQUIRED.code()));
            this.log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
            return EMPTY_RESULT;
        }
        ConsumerGroupMember orMaybeSubscribeDynamicConsumerGroupMember = groupInstanceId == null ? getOrMaybeSubscribeDynamicConsumerGroupMember(consumerGroup, memberId, -1, Collections.emptyList(), true, true) : getOrMaybeSubscribeStaticConsumerGroupMember(consumerGroup, memberId, -1, groupInstanceId, Collections.emptyList(), equals, true, arrayList);
        int groupEpoch = consumerGroup.groupEpoch();
        Map<String, TopicMetadata> subscriptionMetadata = consumerGroup.subscriptionMetadata();
        consumerGroup.subscribedTopicNames();
        SubscriptionType subscriptionType = consumerGroup.subscriptionType();
        ConsumerProtocolSubscription deserializeSubscription = deserializeSubscription(protocols);
        ConsumerGroupMember build = new ConsumerGroupMember.Builder(orMaybeSubscribeDynamicConsumerGroupMember).maybeUpdateInstanceId(Optional.ofNullable(groupInstanceId)).maybeUpdateRackId(Utils.toOptional(deserializeSubscription.rackId())).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(joinGroupRequestData.rebalanceTimeoutMs())).maybeUpdateServerAssignorName(Optional.empty()).maybeUpdateSubscribedTopicNames(Optional.ofNullable(deserializeSubscription.topics())).setClientId(requestContext.clientId()).setClientHost(requestContext.clientAddress.toString()).setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata().setSessionTimeoutMs(sessionTimeoutMs).setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))).build();
        boolean hasMemberSubscriptionChanged = hasMemberSubscriptionChanged(groupId, orMaybeSubscribeDynamicConsumerGroupMember, build, arrayList);
        if (hasMemberSubscriptionChanged || consumerGroup.hasMetadataExpired(milliseconds)) {
            Map<String, Integer> computeSubscribedTopicNames = consumerGroup.computeSubscribedTopicNames(orMaybeSubscribeDynamicConsumerGroupMember, build);
            subscriptionMetadata = consumerGroup.computeSubscriptionMetadata(computeSubscribedTopicNames, this.metadataImage.topics(), this.metadataImage.cluster());
            int numMembers = consumerGroup.numMembers();
            if (!consumerGroup.hasMember(build.memberId()) && !consumerGroup.hasStaticMember(build.instanceId())) {
                numMembers++;
            }
            subscriptionType = ConsumerGroup.subscriptionType(computeSubscribedTopicNames, numMembers);
            if (!subscriptionMetadata.equals(consumerGroup.subscriptionMetadata())) {
                this.log.info("[GroupId {}] Computed new subscription metadata: {}.", groupId, subscriptionMetadata);
                hasMemberSubscriptionChanged = true;
                arrayList.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
            }
            if (hasMemberSubscriptionChanged) {
                groupEpoch++;
                arrayList.add(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, groupEpoch));
                this.log.info("[GroupId {}] Bumped group epoch to {}.", groupId, Integer.valueOf(groupEpoch));
                this.metrics.record(GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
            }
            consumerGroup.setMetadataRefreshDeadline(milliseconds + this.consumerGroupMetadataRefreshIntervalMs, groupEpoch);
        }
        if (groupEpoch > consumerGroup.assignmentEpoch()) {
            targetAssignment = updateTargetAssignment(consumerGroup, groupEpoch, orMaybeSubscribeDynamicConsumerGroupMember, build, subscriptionMetadata, subscriptionType, arrayList);
            assignmentEpoch = groupEpoch;
        } else {
            assignmentEpoch = consumerGroup.assignmentEpoch();
            targetAssignment = consumerGroup.targetAssignment(build.memberId(), build.instanceId());
        }
        consumerGroup.getClass();
        ConsumerGroupMember maybeReconcile = maybeReconcile(groupId, build, (v1, v2) -> {
            return r3.currentPartitionEpoch(v1, v2);
        }, assignmentEpoch, targetAssignment, Utils.toTopicPartitions(deserializeSubscription.ownedPartitions(), this.metadataImage.topics()), arrayList);
        JoinGroupResponseData protocolName = new JoinGroupResponseData().setMemberId(maybeReconcile.memberId()).setGenerationId(maybeReconcile.memberEpoch()).setProtocolType("consumer").setProtocolName(maybeReconcile.supportedClassicProtocols().get().iterator().next().name());
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.whenComplete((r11, th) -> {
            if (th == null) {
                cancelConsumerGroupJoinTimeout(groupId, protocolName.memberId());
                scheduleConsumerGroupSessionTimeout(groupId, protocolName.memberId(), sessionTimeoutMs);
                scheduleConsumerGroupSyncTimeout(groupId, protocolName.memberId(), joinGroupRequestData.rebalanceTimeoutMs());
                completableFuture.complete(protocolName);
            }
        });
        return new CoordinatorResult<>(arrayList, null, completableFuture2, true);
    }

    private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(String str, String str2, int i, String str3, String str4, String str5, List<String> list) throws ApiException {
        int assignmentEpoch;
        Assignment targetAssignment;
        long milliseconds = this.time.milliseconds();
        List<CoordinatorRecord> arrayList = new ArrayList<>();
        List<CoordinatorRecord> arrayList2 = new ArrayList<>();
        boolean z = i == 0;
        ShareGroup orMaybeCreatePersistedShareGroup = getOrMaybeCreatePersistedShareGroup(str, z);
        throwIfShareGroupIsFull(orMaybeCreatePersistedShareGroup, str2);
        if (str2.isEmpty()) {
            str2 = Uuid.randomUuid().toString();
        }
        ShareGroupMember orMaybeSubscribeShareGroupMember = getOrMaybeSubscribeShareGroupMember(orMaybeCreatePersistedShareGroup, str2, i, z);
        ShareGroupMember build = new ShareGroupMember.Builder(orMaybeSubscribeShareGroupMember).maybeUpdateRackId(Optional.ofNullable(str3)).maybeUpdateSubscribedTopicNames(Optional.ofNullable(list)).setClientId(str4).setClientHost(str5).build();
        boolean hasMemberSubscriptionChanged = hasMemberSubscriptionChanged(str, orMaybeSubscribeShareGroupMember, build, arrayList);
        int groupEpoch = orMaybeCreatePersistedShareGroup.groupEpoch();
        Map<String, TopicMetadata> subscriptionMetadata = orMaybeCreatePersistedShareGroup.subscriptionMetadata();
        SubscriptionType subscriptionType = orMaybeCreatePersistedShareGroup.subscriptionType();
        if (hasMemberSubscriptionChanged || orMaybeCreatePersistedShareGroup.hasMetadataExpired(milliseconds)) {
            Map<String, Integer> computeSubscribedTopicNames = orMaybeCreatePersistedShareGroup.computeSubscribedTopicNames(orMaybeSubscribeShareGroupMember, build);
            subscriptionMetadata = orMaybeCreatePersistedShareGroup.computeSubscriptionMetadata(computeSubscribedTopicNames, this.metadataImage.topics(), this.metadataImage.cluster());
            int numMembers = orMaybeCreatePersistedShareGroup.numMembers();
            if (!orMaybeCreatePersistedShareGroup.hasMember(build.memberId())) {
                numMembers++;
            }
            subscriptionType = ModernGroup.subscriptionType(computeSubscribedTopicNames, numMembers);
            if (!subscriptionMetadata.equals(orMaybeCreatePersistedShareGroup.subscriptionMetadata())) {
                this.log.info("[GroupId {}] Computed new subscription metadata: {}.", str, subscriptionMetadata);
                hasMemberSubscriptionChanged = true;
                arrayList2.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(str, subscriptionMetadata));
            }
            if (hasMemberSubscriptionChanged) {
                groupEpoch++;
                arrayList.add(CoordinatorRecordHelpers.newGroupEpochRecord(str, groupEpoch, Group.GroupType.SHARE));
                this.log.info("[GroupId {}] Bumped group epoch to {}.", str, Integer.valueOf(groupEpoch));
            }
            orMaybeCreatePersistedShareGroup.setMetadataRefreshDeadline(milliseconds + this.shareGroupMetadataRefreshIntervalMs, groupEpoch);
        }
        if (groupEpoch > orMaybeCreatePersistedShareGroup.assignmentEpoch()) {
            targetAssignment = updateTargetAssignment(orMaybeCreatePersistedShareGroup, groupEpoch, build, subscriptionMetadata, subscriptionType, arrayList2);
            assignmentEpoch = groupEpoch;
        } else {
            assignmentEpoch = orMaybeCreatePersistedShareGroup.assignmentEpoch();
            targetAssignment = orMaybeCreatePersistedShareGroup.targetAssignment(build.memberId());
        }
        ShareGroupMember maybeReconcile = maybeReconcile(str, build, assignmentEpoch, targetAssignment, arrayList2);
        scheduleShareGroupSessionTimeout(str, str2);
        ShareGroupHeartbeatResponseData heartbeatIntervalMs = new ShareGroupHeartbeatResponseData().setMemberId(maybeReconcile.memberId()).setMemberEpoch(maybeReconcile.memberEpoch()).setHeartbeatIntervalMs(this.shareGroupHeartbeatIntervalMs);
        if (i == 0 || ConsumerGroupMember.hasAssignedPartitionsChanged(orMaybeSubscribeShareGroupMember, maybeReconcile)) {
            heartbeatIntervalMs.setAssignment(createShareGroupResponseAssignment(maybeReconcile));
        }
        arrayList2.forEach(this::replay);
        return new CoordinatorResult<>(arrayList, heartbeatIntervalMs);
    }

    private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(ConsumerGroup consumerGroup, String str, int i, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list, boolean z, boolean z2) {
        ConsumerGroupMember orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(str, z);
        if (!z2) {
            throwIfConsumerGroupMemberEpochIsInvalid(orMaybeCreateMember, i, list);
        }
        if (z) {
            Logger logger = this.log;
            Object[] objArr = new Object[3];
            objArr[0] = consumerGroup.groupId();
            objArr[1] = str;
            objArr[2] = z2 ? "classic" : "consumer";
            logger.info("[GroupId {}] Member {} joins the consumer group using the {} protocol.", objArr);
        }
        return orMaybeCreateMember;
    }

    private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(ConsumerGroup consumerGroup, String str, int i, String str2, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list, boolean z, boolean z2, List<CoordinatorRecord> list2) {
        ConsumerGroupMember staticMember = consumerGroup.staticMember(str2);
        if (!z) {
            throwIfStaticMemberIsUnknown(staticMember, str2);
            throwIfInstanceIdIsFenced(staticMember, consumerGroup.groupId(), str, str2);
            if (!z2) {
                throwIfConsumerGroupMemberEpochIsInvalid(staticMember, i, list);
            }
            return staticMember;
        }
        if (staticMember == null) {
            ConsumerGroupMember orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(str, true);
            Logger logger = this.log;
            Object[] objArr = new Object[4];
            objArr[0] = consumerGroup.groupId();
            objArr[1] = str;
            objArr[2] = str2;
            objArr[3] = z2 ? "classic" : "consumer";
            logger.info("[GroupId {}] Static member {} with instance id {} joins the consumer group using the {} protocol.", objArr);
            return orMaybeCreateMember;
        }
        if (!z2) {
            throwIfInstanceIdIsUnreleased(staticMember, consumerGroup.groupId(), str, str2);
        }
        ConsumerGroupMember build = new ConsumerGroupMember.Builder(staticMember, str).setMemberEpoch(0).setPreviousMemberEpoch(0).build();
        replaceMember(list2, consumerGroup, staticMember, build);
        Logger logger2 = this.log;
        Object[] objArr2 = new Object[5];
        objArr2[0] = consumerGroup.groupId();
        objArr2[1] = str2;
        objArr2[2] = z2 ? "classic" : "consumer";
        objArr2[3] = str;
        objArr2[4] = staticMember.memberId();
        logger2.info("[GroupId {}] Static member with instance id {} re-joins the consumer group using the {} protocol. Created a new member {} to replace the existing member {}.", objArr2);
        return build;
    }

    private ShareGroupMember getOrMaybeSubscribeShareGroupMember(ShareGroup shareGroup, String str, int i, boolean z) {
        ShareGroupMember orMaybeCreateMember = shareGroup.getOrMaybeCreateMember(str, z);
        throwIfShareGroupMemberEpochIsInvalid(orMaybeCreateMember, i);
        if (z) {
            this.log.info("[GroupId {}] Member {} joins the share group using the share protocol.", shareGroup.groupId(), str);
        }
        return orMaybeCreateMember;
    }

    private boolean hasMemberSubscriptionChanged(String str, ConsumerGroupMember consumerGroupMember, ConsumerGroupMember consumerGroupMember2, List<CoordinatorRecord> list) {
        String memberId = consumerGroupMember2.memberId();
        if (consumerGroupMember2.equals(consumerGroupMember)) {
            return false;
        }
        list.add(CoordinatorRecordHelpers.newMemberSubscriptionRecord(str, consumerGroupMember2));
        if (!consumerGroupMember2.subscribedTopicNames().equals(consumerGroupMember.subscribedTopicNames())) {
            this.log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", new Object[]{str, memberId, consumerGroupMember2.subscribedTopicNames()});
            return true;
        }
        if (consumerGroupMember2.subscribedTopicRegex().equals(consumerGroupMember.subscribedTopicRegex())) {
            return false;
        }
        this.log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", new Object[]{str, memberId, consumerGroupMember2.subscribedTopicRegex()});
        return true;
    }

    private boolean hasMemberSubscriptionChanged(String str, ShareGroupMember shareGroupMember, ShareGroupMember shareGroupMember2, List<CoordinatorRecord> list) {
        String memberId = shareGroupMember2.memberId();
        if (shareGroupMember2.equals(shareGroupMember)) {
            return false;
        }
        list.add(CoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(str, shareGroupMember2));
        if (shareGroupMember2.subscribedTopicNames().equals(shareGroupMember.subscribedTopicNames())) {
            return false;
        }
        this.log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", new Object[]{str, memberId, shareGroupMember2.subscribedTopicNames()});
        return true;
    }

    private ConsumerGroupMember maybeReconcile(String str, ConsumerGroupMember consumerGroupMember, BiFunction<Uuid, Integer, Integer> biFunction, int i, Assignment assignment, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list, List<CoordinatorRecord> list2) {
        if (consumerGroupMember.isReconciledTo(i)) {
            return consumerGroupMember;
        }
        ConsumerGroupMember build = new CurrentAssignmentBuilder(consumerGroupMember).withTargetAssignment(i, assignment).withCurrentPartitionEpoch(biFunction).withOwnedTopicPartitions(list).build();
        if (!build.equals(consumerGroupMember)) {
            list2.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(str, build));
            this.log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, assignedPartitions={} and revokedPartitions={}.", new Object[]{str, build.memberId(), Integer.valueOf(build.memberEpoch()), Integer.valueOf(build.previousMemberEpoch()), build.state(), Utils.assignmentToString(build.assignedPartitions()), Utils.assignmentToString(build.partitionsPendingRevocation())});
            if (!build.useClassicProtocol()) {
                if (build.state() == MemberState.UNREVOKED_PARTITIONS) {
                    scheduleConsumerGroupRebalanceTimeout(str, build.memberId(), build.memberEpoch(), build.rebalanceTimeoutMs());
                } else {
                    cancelConsumerGroupRebalanceTimeout(str, build.memberId());
                }
            }
        }
        return build;
    }

    private ShareGroupMember maybeReconcile(String str, ShareGroupMember shareGroupMember, int i, Assignment assignment, List<CoordinatorRecord> list) {
        if (shareGroupMember.isReconciledTo(i)) {
            return shareGroupMember;
        }
        ShareGroupMember build = new ShareGroupAssignmentBuilder(shareGroupMember).withTargetAssignment(i, assignment).build();
        if (!build.equals(shareGroupMember)) {
            list.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(str, build));
            this.log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, assignedPartitions={}.", new Object[]{str, build.memberId(), Integer.valueOf(build.memberEpoch()), Integer.valueOf(build.previousMemberEpoch()), build.state(), Utils.assignmentToString(build.assignedPartitions())});
        }
        return build;
    }

    private Assignment updateTargetAssignment(ConsumerGroup consumerGroup, int i, ConsumerGroupMember consumerGroupMember, ConsumerGroupMember consumerGroupMember2, Map<String, TopicMetadata> map, SubscriptionType subscriptionType, List<CoordinatorRecord> list) {
        String orElse = consumerGroup.computePreferredServerAssignor(consumerGroupMember, consumerGroupMember2).orElse(this.defaultConsumerGroupAssignor.name());
        try {
            TargetAssignmentBuilder addOrUpdateMember = new TargetAssignmentBuilder(consumerGroup.groupId(), i, this.consumerGroupAssignors.get(orElse)).withMembers(consumerGroup.members()).withStaticMembers(consumerGroup.staticMembers()).withSubscriptionMetadata(map).withSubscriptionType(subscriptionType).withTargetAssignment(consumerGroup.targetAssignment()).withInvertedTargetAssignment(consumerGroup.invertedTargetAssignment()).withTopicsImage(this.metadataImage.topics()).addOrUpdateMember(consumerGroupMember2.memberId(), consumerGroupMember2);
            String staticMemberId = consumerGroup.staticMemberId(consumerGroupMember2.instanceId());
            if (staticMemberId != null && !consumerGroupMember2.memberId().equals(staticMemberId)) {
                addOrUpdateMember.removeMember(staticMemberId);
            }
            long milliseconds = this.time.milliseconds();
            TargetAssignmentBuilder.TargetAssignmentResult build = addOrUpdateMember.build();
            this.log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.", new Object[]{consumerGroup.groupId(), Integer.valueOf(i), orElse, Long.valueOf(this.time.milliseconds() - milliseconds), build.targetAssignment()});
            list.addAll(build.records());
            MemberAssignment memberAssignment = build.targetAssignment().get(consumerGroupMember2.memberId());
            return memberAssignment != null ? new Assignment(memberAssignment.partitions()) : Assignment.EMPTY;
        } catch (PartitionAssignorException e) {
            String format = String.format("Failed to compute a new target assignment for epoch %d: %s", Integer.valueOf(i), e.getMessage());
            this.log.error("[GroupId {}] {}.", consumerGroup.groupId(), format);
            throw new UnknownServerException(format, e);
        }
    }

    private Assignment updateTargetAssignment(ShareGroup shareGroup, int i, ShareGroupMember shareGroupMember, Map<String, TopicMetadata> map, SubscriptionType subscriptionType, List<CoordinatorRecord> list) {
        try {
            TargetAssignmentBuilder addOrUpdateMember = new TargetAssignmentBuilder(shareGroup.groupId(), i, this.shareGroupAssignor).withMembers(shareGroup.members()).withSubscriptionMetadata(map).withSubscriptionType(subscriptionType).withTargetAssignment(shareGroup.targetAssignment()).withInvertedTargetAssignment(shareGroup.invertedTargetAssignment()).withTopicsImage(this.metadataImage.topics()).addOrUpdateMember(shareGroupMember.memberId(), shareGroupMember);
            long milliseconds = this.time.milliseconds();
            TargetAssignmentBuilder.TargetAssignmentResult build = addOrUpdateMember.build();
            this.log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.", new Object[]{shareGroup.groupId(), Integer.valueOf(i), this.shareGroupAssignor, Long.valueOf(this.time.milliseconds() - milliseconds), build.targetAssignment()});
            list.addAll(build.records());
            MemberAssignment memberAssignment = build.targetAssignment().get(shareGroupMember.memberId());
            return memberAssignment != null ? new Assignment(memberAssignment.partitions()) : Assignment.EMPTY;
        } catch (PartitionAssignorException e) {
            String format = String.format("Failed to compute a new target assignment for epoch %d: %s", Integer.valueOf(i), e.getMessage());
            this.log.error("[GroupId {}] {}.", shareGroup.groupId(), format);
            throw new UnknownServerException(format, e);
        }
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupLeave(String str, String str2, String str3, int i) throws ApiException {
        ConsumerGroup consumerGroup = consumerGroup(str);
        ConsumerGroupHeartbeatResponseData memberEpoch = new ConsumerGroupHeartbeatResponseData().setMemberId(str3).setMemberEpoch(i);
        if (str2 == null) {
            ConsumerGroupMember orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(str3, false);
            this.log.info("[GroupId {}] Member {} left the consumer group.", str, str3);
            return consumerGroupFenceMember(consumerGroup, orMaybeCreateMember, memberEpoch);
        }
        ConsumerGroupMember staticMember = consumerGroup.staticMember(str2);
        throwIfStaticMemberIsUnknown(staticMember, str2);
        throwIfInstanceIdIsFenced(staticMember, str, str3, str2);
        if (i == -2) {
            this.log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group.", new Object[]{consumerGroup.groupId(), str3, str2});
            return consumerGroupStaticMemberGroupLeave(consumerGroup, staticMember);
        }
        this.log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group.", new Object[]{consumerGroup.groupId(), str3, str2});
        return consumerGroupFenceMember(consumerGroup, staticMember, memberEpoch);
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupStaticMemberGroupLeave(ConsumerGroup consumerGroup, ConsumerGroupMember consumerGroupMember) {
        return new CoordinatorResult<>(Collections.singletonList(CoordinatorRecordHelpers.newCurrentAssignmentRecord(consumerGroup.groupId(), new ConsumerGroupMember.Builder(consumerGroupMember).setMemberEpoch(-2).setPartitionsPendingRevocation(Collections.emptyMap()).build())), new ConsumerGroupHeartbeatResponseData().setMemberId(consumerGroupMember.memberId()).setMemberEpoch(-2));
    }

    private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupLeave(String str, String str2, int i) throws ApiException {
        ShareGroup shareGroup = shareGroup(str);
        ShareGroupHeartbeatResponseData memberEpoch = new ShareGroupHeartbeatResponseData().setMemberId(str2).setMemberEpoch(i);
        ShareGroupMember orMaybeCreateMember = shareGroup.getOrMaybeCreateMember(str2, false);
        this.log.info("[GroupId {}] Member {} left the share group.", str, str2);
        return shareGroupFenceMember(shareGroup, orMaybeCreateMember, memberEpoch);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMember(ConsumerGroup consumerGroup, ConsumerGroupMember consumerGroupMember, T t) {
        if (validateOnlineDowngrade(consumerGroup, consumerGroupMember.memberId())) {
            return convertToClassicGroup(consumerGroup, consumerGroupMember.memberId(), t);
        }
        ArrayList arrayList = new ArrayList();
        removeMember(arrayList, consumerGroup.groupId(), consumerGroupMember.memberId());
        Map<String, TopicMetadata> computeSubscriptionMetadata = consumerGroup.computeSubscriptionMetadata(consumerGroup.computeSubscribedTopicNames(consumerGroupMember, null), this.metadataImage.topics(), this.metadataImage.cluster());
        if (!computeSubscriptionMetadata.equals(consumerGroup.subscriptionMetadata())) {
            this.log.info("[GroupId {}] Computed new subscription metadata: {}.", consumerGroup.groupId(), computeSubscriptionMetadata);
            arrayList.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(consumerGroup.groupId(), computeSubscriptionMetadata));
        }
        arrayList.add(CoordinatorRecordHelpers.newGroupEpochRecord(consumerGroup.groupId(), consumerGroup.groupEpoch() + 1));
        cancelTimers(consumerGroup.groupId(), consumerGroupMember.memberId());
        return new CoordinatorResult<>(arrayList, t);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(ShareGroup shareGroup, ShareGroupMember shareGroupMember, T t) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(shareGroup.groupId(), shareGroupMember.memberId()));
        arrayList2.add(CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(shareGroup.groupId(), shareGroupMember.memberId()));
        arrayList.add(CoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(shareGroup.groupId(), shareGroupMember.memberId()));
        Map<String, TopicMetadata> computeSubscriptionMetadata = shareGroup.computeSubscriptionMetadata(shareGroup.computeSubscribedTopicNames(shareGroupMember, null), this.metadataImage.topics(), this.metadataImage.cluster());
        if (!computeSubscriptionMetadata.equals(shareGroup.subscriptionMetadata())) {
            this.log.info("[GroupId {}] Computed new subscription metadata: {}.", shareGroup.groupId(), computeSubscriptionMetadata);
            arrayList2.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(shareGroup.groupId(), computeSubscriptionMetadata));
        }
        arrayList.add(CoordinatorRecordHelpers.newGroupEpochRecord(shareGroup.groupId(), shareGroup.groupEpoch() + 1, Group.GroupType.SHARE));
        cancelGroupSessionTimeout(shareGroup.groupId(), shareGroupMember.memberId());
        arrayList2.forEach(this::replay);
        return new CoordinatorResult<>(arrayList, t);
    }

    private void replaceMember(List<CoordinatorRecord> list, ConsumerGroup consumerGroup, ConsumerGroupMember consumerGroupMember, ConsumerGroupMember consumerGroupMember2) {
        String groupId = consumerGroup.groupId();
        removeMember(list, groupId, consumerGroupMember.memberId());
        list.add(CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, consumerGroupMember2));
        list.add(CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, consumerGroupMember2.memberId(), consumerGroup.targetAssignment(consumerGroupMember.memberId()).partitions()));
        list.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, consumerGroupMember2));
    }

    private void removeMember(List<CoordinatorRecord> list, String str, String str2) {
        list.add(CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(str, str2));
        list.add(CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(str, str2));
        list.add(CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(str, str2));
    }

    private void cancelTimers(String str, String str2) {
        cancelGroupSessionTimeout(str, str2);
        cancelConsumerGroupRebalanceTimeout(str, str2);
        cancelConsumerGroupJoinTimeout(str, str2);
        cancelConsumerGroupSyncTimeout(str, str2);
    }

    private void scheduleConsumerGroupSessionTimeout(String str, String str2) {
        scheduleConsumerGroupSessionTimeout(str, str2, this.consumerGroupSessionTimeoutMs);
    }

    private void scheduleShareGroupSessionTimeout(String str, String str2) {
        scheduleShareGroupSessionTimeout(str, str2, this.shareGroupSessionTimeoutMs);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMemberOperation(String str, String str2, String str3) {
        try {
            ConsumerGroup consumerGroup = consumerGroup(str);
            ConsumerGroupMember orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(str2, false);
            this.log.info("[GroupId {}] Member {} fenced from the group because {}.", new Object[]{str, str2, str3});
            return consumerGroupFenceMember(consumerGroup, orMaybeCreateMember, null);
        } catch (GroupIdNotFoundException e) {
            this.log.debug("[GroupId {}] Could not fence {} because the group does not exist.", str, str2);
            return new CoordinatorResult<>(Collections.emptyList());
        } catch (UnknownMemberIdException e2) {
            this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", str, str2);
            return new CoordinatorResult<>(Collections.emptyList());
        }
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMemberOperation(String str, String str2, String str3) {
        try {
            ShareGroup shareGroup = shareGroup(str);
            ShareGroupMember orMaybeCreateMember = shareGroup.getOrMaybeCreateMember(str2, false);
            this.log.info("[GroupId {}] Member {} fenced from the group because {}.", new Object[]{str, str2, str3});
            return shareGroupFenceMember(shareGroup, orMaybeCreateMember, null);
        } catch (GroupIdNotFoundException e) {
            this.log.debug("[GroupId {}] Could not fence {} because the group does not exist.", str, str2);
            return new CoordinatorResult<>(Collections.emptyList());
        } catch (UnknownMemberIdException e2) {
            this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", str, str2);
            return new CoordinatorResult<>(Collections.emptyList());
        }
    }

    private void scheduleConsumerGroupSessionTimeout(String str, String str2, int i) {
        this.timer.schedule(groupSessionTimeoutKey(str, str2), i, TimeUnit.MILLISECONDS, true, () -> {
            return consumerGroupFenceMemberOperation(str, str2, "the member session expired.");
        });
    }

    private void scheduleShareGroupSessionTimeout(String str, String str2, int i) {
        this.timer.schedule(groupSessionTimeoutKey(str, str2), i, TimeUnit.MILLISECONDS, true, () -> {
            return shareGroupFenceMemberOperation(str, str2, "the member session expired.");
        });
    }

    private void cancelGroupSessionTimeout(String str, String str2) {
        this.timer.cancel(groupSessionTimeoutKey(str, str2));
    }

    private void scheduleConsumerGroupRebalanceTimeout(String str, String str2, int i, int i2) {
        this.timer.schedule(consumerGroupRebalanceTimeoutKey(str, str2), i2, TimeUnit.MILLISECONDS, true, () -> {
            try {
                ConsumerGroup consumerGroup = consumerGroup(str);
                ConsumerGroupMember orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(str2, false);
                if (orMaybeCreateMember.memberEpoch() == i) {
                    this.log.info("[GroupId {}] Member {} fenced from the group because it failed to transition from epoch {} within {}ms.", new Object[]{str, str2, Integer.valueOf(i), Integer.valueOf(i2)});
                    return consumerGroupFenceMember(consumerGroup, orMaybeCreateMember, null);
                }
                this.log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member left the epoch {}.", new Object[]{str, str2, Integer.valueOf(i)});
                return new CoordinatorResult(Collections.emptyList());
            } catch (GroupIdNotFoundException e) {
                this.log.debug("[GroupId {}] Could not fence {}} because the group does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            } catch (UnknownMemberIdException e2) {
                this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            }
        });
    }

    private void cancelConsumerGroupRebalanceTimeout(String str, String str2) {
        this.timer.cancel(consumerGroupRebalanceTimeoutKey(str, str2));
    }

    private void scheduleConsumerGroupJoinTimeoutIfAbsent(String str, String str2, int i) {
        this.timer.scheduleIfAbsent(consumerGroupJoinKey(str, str2), i, TimeUnit.MILLISECONDS, true, () -> {
            return consumerGroupFenceMemberOperation(str, str2, "the classic member failed to join within the rebalance timeout.");
        });
    }

    private void cancelConsumerGroupJoinTimeout(String str, String str2) {
        this.timer.cancel(consumerGroupJoinKey(str, str2));
    }

    private void scheduleConsumerGroupSyncTimeout(String str, String str2, int i) {
        this.timer.schedule(consumerGroupSyncKey(str, str2), i, TimeUnit.MILLISECONDS, true, () -> {
            return consumerGroupFenceMemberOperation(str, str2, "the member failed to sync within timeout.");
        });
    }

    private void cancelConsumerGroupSyncTimeout(String str, String str2) {
        this.timer.cancel(consumerGroupSyncKey(str, str2));
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(RequestContext requestContext, ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) throws ApiException {
        throwIfConsumerGroupHeartbeatRequestIsInvalid(consumerGroupHeartbeatRequestData);
        return (consumerGroupHeartbeatRequestData.memberEpoch() == -1 || consumerGroupHeartbeatRequestData.memberEpoch() == -2) ? consumerGroupLeave(consumerGroupHeartbeatRequestData.groupId(), consumerGroupHeartbeatRequestData.instanceId(), consumerGroupHeartbeatRequestData.memberId(), consumerGroupHeartbeatRequestData.memberEpoch()) : consumerGroupHeartbeat(consumerGroupHeartbeatRequestData.groupId(), consumerGroupHeartbeatRequestData.memberId(), consumerGroupHeartbeatRequestData.memberEpoch(), consumerGroupHeartbeatRequestData.instanceId(), consumerGroupHeartbeatRequestData.rackId(), consumerGroupHeartbeatRequestData.rebalanceTimeoutMs(), requestContext.clientId(), requestContext.clientAddress.toString(), consumerGroupHeartbeatRequestData.subscribedTopicNames(), consumerGroupHeartbeatRequestData.serverAssignor(), consumerGroupHeartbeatRequestData.topicPartitions());
    }

    public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(RequestContext requestContext, ShareGroupHeartbeatRequestData shareGroupHeartbeatRequestData) throws ApiException {
        throwIfShareGroupHeartbeatRequestIsInvalid(shareGroupHeartbeatRequestData);
        return shareGroupHeartbeatRequestData.memberEpoch() == -1 ? shareGroupLeave(shareGroupHeartbeatRequestData.groupId(), shareGroupHeartbeatRequestData.memberId(), shareGroupHeartbeatRequestData.memberEpoch()) : shareGroupHeartbeat(shareGroupHeartbeatRequestData.groupId(), shareGroupHeartbeatRequestData.memberId(), shareGroupHeartbeatRequestData.memberEpoch(), shareGroupHeartbeatRequestData.rackId(), requestContext.clientId(), requestContext.clientAddress.toString(), shareGroupHeartbeatRequestData.subscribedTopicNames());
    }

    private void replay(CoordinatorRecord coordinatorRecord) {
        ApiMessageAndVersion key = coordinatorRecord.key();
        ApiMessageAndVersion value = coordinatorRecord.value();
        switch (key.version()) {
            case 4:
                replay((ConsumerGroupPartitionMetadataKey) key.message(), (ConsumerGroupPartitionMetadataValue) Utils.messageOrNull(value), Group.GroupType.SHARE);
                return;
            case 5:
            default:
                throw new IllegalStateException("Received an unknown record type " + ((int) key.version()) + " in " + coordinatorRecord);
            case 6:
                replay((ConsumerGroupTargetAssignmentMetadataKey) key.message(), (ConsumerGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value), Group.GroupType.SHARE);
                return;
            case 7:
                replay((ConsumerGroupTargetAssignmentMemberKey) key.message(), (ConsumerGroupTargetAssignmentMemberValue) Utils.messageOrNull(value), Group.GroupType.SHARE);
                return;
            case 8:
                replay((ConsumerGroupCurrentMemberAssignmentKey) key.message(), (ConsumerGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value), Group.GroupType.SHARE);
                return;
        }
    }

    public void replay(ConsumerGroupMemberMetadataKey consumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataValue consumerGroupMemberMetadataValue) {
        String groupId = consumerGroupMemberMetadataKey.groupId();
        String memberId = consumerGroupMemberMetadataKey.memberId();
        ConsumerGroup orMaybeCreatePersistedConsumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, consumerGroupMemberMetadataValue != null);
        HashSet hashSet = new HashSet(orMaybeCreatePersistedConsumerGroup.subscribedTopicNames().keySet());
        if (consumerGroupMemberMetadataValue != null) {
            orMaybeCreatePersistedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreatePersistedConsumerGroup.getOrMaybeCreateMember(memberId, true)).updateWith(consumerGroupMemberMetadataValue).build());
        } else {
            if (orMaybeCreatePersistedConsumerGroup.getOrMaybeCreateMember(memberId, false).memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
            }
            if (orMaybeCreatePersistedConsumerGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
            }
            orMaybeCreatePersistedConsumerGroup.removeMember(memberId);
        }
        updateGroupsByTopics(groupId, hashSet, orMaybeCreatePersistedConsumerGroup.subscribedTopicNames().keySet());
    }

    public Set<String> groupsSubscribedToTopic(String str) {
        Set<String> set = (Set) this.groupsByTopics.get(str);
        return set != null ? set : Collections.emptySet();
    }

    private void subscribeGroupToTopic(String str, String str2) {
        ((TimelineHashSet) this.groupsByTopics.computeIfAbsent(str2, str3 -> {
            return new TimelineHashSet(this.snapshotRegistry, 1);
        })).add(str);
    }

    private void unsubscribeGroupFromTopic(String str, String str2) {
        this.groupsByTopics.computeIfPresent(str2, (str3, timelineHashSet) -> {
            timelineHashSet.remove(str);
            if (timelineHashSet.isEmpty()) {
                return null;
            }
            return timelineHashSet;
        });
    }

    private void updateGroupsByTopics(String str, Set<String> set, Set<String> set2) {
        if (set.isEmpty()) {
            set2.forEach(str2 -> {
                subscribeGroupToTopic(str, str2);
            });
        } else if (set2.isEmpty()) {
            set.forEach(str3 -> {
                unsubscribeGroupFromTopic(str, str3);
            });
        } else {
            set.forEach(str4 -> {
                if (set2.contains(str4)) {
                    return;
                }
                unsubscribeGroupFromTopic(str, str4);
            });
            set2.forEach(str5 -> {
                if (set.contains(str5)) {
                    return;
                }
                subscribeGroupToTopic(str, str5);
            });
        }
    }

    public void replay(ConsumerGroupMetadataKey consumerGroupMetadataKey, ConsumerGroupMetadataValue consumerGroupMetadataValue) {
        String groupId = consumerGroupMetadataKey.groupId();
        if (consumerGroupMetadataValue != null) {
            getOrMaybeCreatePersistedConsumerGroup(groupId, true).setGroupEpoch(consumerGroupMetadataValue.epoch());
            return;
        }
        ConsumerGroup orMaybeCreatePersistedConsumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
        if (!orMaybeCreatePersistedConsumerGroup.members().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + orMaybeCreatePersistedConsumerGroup.members().size() + " members.");
        }
        if (!orMaybeCreatePersistedConsumerGroup.targetAssignment().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + orMaybeCreatePersistedConsumerGroup.targetAssignment().size() + " members.");
        }
        if (orMaybeCreatePersistedConsumerGroup.assignmentEpoch() != -1) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
        }
        removeGroup(groupId);
    }

    public void replay(ConsumerGroupPartitionMetadataKey consumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue consumerGroupPartitionMetadataValue) {
        replay(consumerGroupPartitionMetadataKey, consumerGroupPartitionMetadataValue, Group.GroupType.CONSUMER);
    }

    public void replay(ConsumerGroupPartitionMetadataKey consumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue consumerGroupPartitionMetadataValue, Group.GroupType groupType) {
        ModernGroup<?> orMaybeCreatePersistedGroup = getOrMaybeCreatePersistedGroup(consumerGroupPartitionMetadataKey.groupId(), false, groupType);
        if (consumerGroupPartitionMetadataValue == null) {
            orMaybeCreatePersistedGroup.setSubscriptionMetadata(Collections.emptyMap());
            return;
        }
        HashMap hashMap = new HashMap();
        consumerGroupPartitionMetadataValue.topics().forEach(topicMetadata -> {
            hashMap.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
        });
        orMaybeCreatePersistedGroup.setSubscriptionMetadata(hashMap);
    }

    public void replay(ConsumerGroupTargetAssignmentMemberKey consumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue consumerGroupTargetAssignmentMemberValue) {
        replay(consumerGroupTargetAssignmentMemberKey, consumerGroupTargetAssignmentMemberValue, Group.GroupType.CONSUMER);
    }

    public void replay(ConsumerGroupTargetAssignmentMemberKey consumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue consumerGroupTargetAssignmentMemberValue, Group.GroupType groupType) {
        String groupId = consumerGroupTargetAssignmentMemberKey.groupId();
        String memberId = consumerGroupTargetAssignmentMemberKey.memberId();
        ModernGroup<?> orMaybeCreatePersistedGroup = getOrMaybeCreatePersistedGroup(groupId, false, groupType);
        if (consumerGroupTargetAssignmentMemberValue != null) {
            orMaybeCreatePersistedGroup.updateTargetAssignment(memberId, Assignment.fromRecord(consumerGroupTargetAssignmentMemberValue));
        } else {
            orMaybeCreatePersistedGroup.removeTargetAssignment(memberId);
        }
    }

    public void replay(ConsumerGroupTargetAssignmentMetadataKey consumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue consumerGroupTargetAssignmentMetadataValue) {
        replay(consumerGroupTargetAssignmentMetadataKey, consumerGroupTargetAssignmentMetadataValue, Group.GroupType.CONSUMER);
    }

    public void replay(ConsumerGroupTargetAssignmentMetadataKey consumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue consumerGroupTargetAssignmentMetadataValue, Group.GroupType groupType) {
        String groupId = consumerGroupTargetAssignmentMetadataKey.groupId();
        ModernGroup<?> orMaybeCreatePersistedGroup = getOrMaybeCreatePersistedGroup(groupId, false, groupType);
        if (consumerGroupTargetAssignmentMetadataValue != null) {
            orMaybeCreatePersistedGroup.setTargetAssignmentEpoch(consumerGroupTargetAssignmentMetadataValue.assignmentEpoch());
        } else {
            if (!orMaybeCreatePersistedGroup.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + orMaybeCreatePersistedGroup.targetAssignment().size() + " members.");
            }
            orMaybeCreatePersistedGroup.setTargetAssignmentEpoch(-1);
        }
    }

    public void replay(ConsumerGroupCurrentMemberAssignmentKey consumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue consumerGroupCurrentMemberAssignmentValue) {
        replay(consumerGroupCurrentMemberAssignmentKey, consumerGroupCurrentMemberAssignmentValue, Group.GroupType.CONSUMER);
    }

    public void replay(ConsumerGroupCurrentMemberAssignmentKey consumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue consumerGroupCurrentMemberAssignmentValue, Group.GroupType groupType) {
        String groupId = consumerGroupCurrentMemberAssignmentKey.groupId();
        String memberId = consumerGroupCurrentMemberAssignmentKey.memberId();
        if (groupType == Group.GroupType.CONSUMER) {
            ConsumerGroup orMaybeCreatePersistedConsumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
            ConsumerGroupMember orMaybeCreateMember = orMaybeCreatePersistedConsumerGroup.getOrMaybeCreateMember(memberId, false);
            if (consumerGroupCurrentMemberAssignmentValue != null) {
                orMaybeCreatePersistedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreateMember).updateWith(consumerGroupCurrentMemberAssignmentValue).build());
                return;
            } else {
                orMaybeCreatePersistedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreateMember).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setAssignedPartitions(Collections.emptyMap()).setPartitionsPendingRevocation(Collections.emptyMap()).build());
                return;
            }
        }
        if (groupType != Group.GroupType.SHARE) {
            throw new IllegalStateException("Received a ConsumerGroupCurrentMemberAssignmentKey/Value record for an unknown group type: " + groupType);
        }
        ShareGroup orMaybeCreatePersistedShareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
        ShareGroupMember orMaybeCreateMember2 = orMaybeCreatePersistedShareGroup.getOrMaybeCreateMember(memberId, false);
        if (consumerGroupCurrentMemberAssignmentValue != null) {
            orMaybeCreatePersistedShareGroup.updateMember(new ShareGroupMember.Builder(orMaybeCreateMember2).updateWith(consumerGroupCurrentMemberAssignmentValue).build());
        } else {
            orMaybeCreatePersistedShareGroup.updateMember(new ShareGroupMember.Builder(orMaybeCreateMember2).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setAssignedPartitions(Collections.emptyMap()).build());
        }
    }

    public void replay(ShareGroupMemberMetadataKey shareGroupMemberMetadataKey, ShareGroupMemberMetadataValue shareGroupMemberMetadataValue) {
        String groupId = shareGroupMemberMetadataKey.groupId();
        String memberId = shareGroupMemberMetadataKey.memberId();
        ShareGroup orMaybeCreatePersistedShareGroup = getOrMaybeCreatePersistedShareGroup(groupId, shareGroupMemberMetadataValue != null);
        HashSet hashSet = new HashSet(orMaybeCreatePersistedShareGroup.subscribedTopicNames().keySet());
        if (shareGroupMemberMetadataValue != null) {
            orMaybeCreatePersistedShareGroup.updateMember(new ShareGroupMember.Builder(orMaybeCreatePersistedShareGroup.getOrMaybeCreateMember(memberId, true)).updateWith(shareGroupMemberMetadataValue).build());
        } else {
            if (orMaybeCreatePersistedShareGroup.getOrMaybeCreateMember(memberId, false).memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " with invalid leave group epoch.");
            }
            if (orMaybeCreatePersistedShareGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but member exists in target assignment.");
            }
            orMaybeCreatePersistedShareGroup.removeMember(memberId);
        }
        updateGroupsByTopics(groupId, hashSet, orMaybeCreatePersistedShareGroup.subscribedTopicNames().keySet());
    }

    public void replay(ShareGroupMetadataKey shareGroupMetadataKey, ShareGroupMetadataValue shareGroupMetadataValue) {
        String groupId = shareGroupMetadataKey.groupId();
        if (shareGroupMetadataValue != null) {
            getOrMaybeCreatePersistedShareGroup(groupId, true).setGroupEpoch(shareGroupMetadataValue.epoch());
            return;
        }
        ShareGroup orMaybeCreatePersistedShareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
        if (!orMaybeCreatePersistedShareGroup.members().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + orMaybeCreatePersistedShareGroup.members().size() + " members.");
        }
        if (!orMaybeCreatePersistedShareGroup.targetAssignment().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + orMaybeCreatePersistedShareGroup.targetAssignment().size() + " members.");
        }
        if (orMaybeCreatePersistedShareGroup.assignmentEpoch() != -1) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but target assignment epoch in invalid.");
        }
        removeGroup(groupId);
    }

    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        this.metadataImage = metadataImage;
        Optional.ofNullable(metadataDelta.topicsDelta()).ifPresent(topicsDelta -> {
            HashSet hashSet = new HashSet();
            topicsDelta.changedTopics().forEach((uuid, topicDelta) -> {
                hashSet.addAll(groupsSubscribedToTopic(topicDelta.name()));
            });
            topicsDelta.deletedTopicIds().forEach(uuid2 -> {
                hashSet.addAll(groupsSubscribedToTopic(metadataDelta.image().topics().getTopic(uuid2).name()));
            });
            hashSet.forEach(str -> {
                Group group = (Group) this.groups.get(str);
                if (group != null) {
                    if (group.type() == Group.GroupType.CONSUMER || group.type() == Group.GroupType.SHARE) {
                        ((ModernGroup) group).requestMetadataRefresh();
                    }
                }
            });
        });
    }

    public void onLoaded() {
        this.groups.forEach((str, group) -> {
            switch (group.type()) {
                case CONSUMER:
                    ConsumerGroup consumerGroup = (ConsumerGroup) group;
                    this.log.info("Loaded consumer group {} with {} members.", str, Integer.valueOf(consumerGroup.members().size()));
                    consumerGroup.members().forEach((str, consumerGroupMember) -> {
                        this.log.debug("Loaded member {} in consumer group {}.", str, str);
                        scheduleConsumerGroupSessionTimeout(str, str);
                        if (consumerGroupMember.state() == MemberState.UNREVOKED_PARTITIONS) {
                            scheduleConsumerGroupRebalanceTimeout(str, consumerGroupMember.memberId(), consumerGroupMember.memberEpoch(), consumerGroupMember.rebalanceTimeoutMs());
                        }
                    });
                    return;
                case CLASSIC:
                    ClassicGroup classicGroup = (ClassicGroup) group;
                    this.log.info("Loaded classic group {} with {} members.", str, Integer.valueOf(classicGroup.allMembers().size()));
                    classicGroup.allMembers().forEach(classicGroupMember -> {
                        this.log.debug("Loaded member {} in classic group {}.", classicGroupMember.memberId(), str);
                        rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroupMember);
                    });
                    if (classicGroup.numMembers() > this.classicGroupMaxSize) {
                        prepareRebalance(classicGroup, "Freshly-loaded group " + str + " (size " + classicGroup.numMembers() + ") is over capacity " + this.classicGroupMaxSize + ". Rebalancing in order to give a chance for consumers to commit offsets");
                        return;
                    }
                    return;
                case SHARE:
                    return;
                default:
                    this.log.warn("Loaded group {} with an unknown group type {}.", str, group.type());
                    return;
            }
        });
    }

    public void onUnloaded() {
        this.groups.values().forEach(group -> {
            switch (group.type()) {
                case CONSUMER:
                    ConsumerGroup consumerGroup = (ConsumerGroup) group;
                    this.log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", consumerGroup.groupId(), Integer.valueOf(consumerGroup.groupEpoch()));
                    return;
                case CLASSIC:
                    ClassicGroup classicGroup = (ClassicGroup) group;
                    this.log.info("[GroupId={}] Unloading group metadata for generation {}.", classicGroup.groupId(), Integer.valueOf(classicGroup.generationId()));
                    classicGroup.transitionTo(ClassicGroupState.DEAD);
                    switch (classicGroup.previousState()) {
                        case EMPTY:
                        case DEAD:
                        default:
                            return;
                        case PREPARING_REBALANCE:
                            classicGroup.allMembers().forEach(classicGroupMember -> {
                                classicGroup.completeJoinFuture(classicGroupMember, new JoinGroupResponseData().setMemberId(classicGroupMember.memberId()).setErrorCode(Errors.NOT_COORDINATOR.code()));
                            });
                            return;
                        case COMPLETING_REBALANCE:
                        case STABLE:
                            classicGroup.allMembers().forEach(classicGroupMember2 -> {
                                classicGroup.completeSyncFuture(classicGroupMember2, new SyncGroupResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()));
                            });
                            return;
                    }
                case SHARE:
                    ShareGroup shareGroup = (ShareGroup) group;
                    this.log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", shareGroup.groupId(), Integer.valueOf(shareGroup.groupEpoch()));
                    return;
                default:
                    this.log.warn("onUnloaded group with an unknown group type {}.", group.type());
                    return;
            }
        });
    }

    public static String groupSessionTimeoutKey(String str, String str2) {
        return "session-timeout-" + str + "-" + str2;
    }

    public static String consumerGroupRebalanceTimeoutKey(String str, String str2) {
        return "rebalance-timeout-" + str + "-" + str2;
    }

    public void replay(GroupMetadataKey groupMetadataKey, GroupMetadataValue groupMetadataValue) {
        String group = groupMetadataKey.group();
        if (groupMetadataValue == null) {
            removeGroup(group);
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (GroupMetadataValue.MemberMetadata memberMetadata : groupMetadataValue.members()) {
            int sessionTimeout = memberMetadata.rebalanceTimeout() == -1 ? memberMetadata.sessionTimeout() : memberMetadata.rebalanceTimeout();
            JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
            joinGroupRequestProtocolCollection.add(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(groupMetadataValue.protocol()).setMetadata(memberMetadata.subscription()));
            arrayList.add(new ClassicGroupMember(memberMetadata.memberId(), Optional.ofNullable(memberMetadata.groupInstanceId()), memberMetadata.clientId(), memberMetadata.clientHost(), sessionTimeout, memberMetadata.sessionTimeout(), groupMetadataValue.protocolType(), joinGroupRequestProtocolCollection, memberMetadata.assignment()));
        }
        String protocolType = groupMetadataValue.protocolType();
        ClassicGroup classicGroup = new ClassicGroup(this.logContext, group, arrayList.isEmpty() ? ClassicGroupState.EMPTY : ClassicGroupState.STABLE, this.time, this.metrics, groupMetadataValue.generation(), (protocolType == null || protocolType.isEmpty()) ? Optional.empty() : Optional.of(protocolType), Optional.ofNullable(groupMetadataValue.protocol()), Optional.ofNullable(groupMetadataValue.leader()), groupMetadataValue.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(Long.valueOf(groupMetadataValue.currentStateTimestamp())));
        arrayList.forEach(classicGroupMember -> {
            classicGroup.add(classicGroupMember, null);
        });
        if (((Group) this.groups.put(group, classicGroup)) == null) {
            this.metrics.onClassicGroupStateTransition(null, classicGroup.currentState());
        }
        classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
    }

    public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, CompletableFuture<JoinGroupResponseData> completableFuture) {
        Group group = (Group) this.groups.get(joinGroupRequestData.groupId(), Long.MAX_VALUE);
        return (group == null || group.type() != Group.GroupType.CONSUMER || group.isEmpty()) ? classicGroupJoinToClassicGroup(requestContext, joinGroupRequestData, completableFuture) : classicGroupJoinToConsumerGroup((ConsumerGroup) group, requestContext, joinGroupRequestData, completableFuture);
    }

    CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToClassicGroup(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, CompletableFuture<JoinGroupResponseData> completableFuture) {
        CoordinatorResult<Void, CoordinatorRecord> coordinatorResult = EMPTY_RESULT;
        ArrayList arrayList = new ArrayList();
        String groupId = joinGroupRequestData.groupId();
        String memberId = joinGroupRequestData.memberId();
        boolean equals = memberId.equals(ClassicGroup.NO_LEADER);
        maybeDeleteEmptyConsumerGroup(groupId, arrayList);
        boolean z = !this.groups.containsKey(groupId);
        try {
            ClassicGroup orMaybeCreateClassicGroup = getOrMaybeCreateClassicGroup(groupId, equals);
            if (acceptJoiningMember(orMaybeCreateClassicGroup, memberId)) {
                coordinatorResult = equals ? classicGroupJoinNewMember(requestContext, joinGroupRequestData, orMaybeCreateClassicGroup, completableFuture) : classicGroupJoinExistingMember(requestContext, joinGroupRequestData, orMaybeCreateClassicGroup, completableFuture);
            } else {
                orMaybeCreateClassicGroup.remove(memberId);
                completableFuture.complete(new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code()));
            }
            if (!z || coordinatorResult != EMPTY_RESULT) {
                return coordinatorResult;
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            completableFuture2.whenComplete((r8, th) -> {
                if (th != null) {
                    this.log.warn("Failed to write empty metadata for group {}: {}", orMaybeCreateClassicGroup.groupId(), th.getMessage());
                    completableFuture.complete(new JoinGroupResponseData().setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(th)).code()));
                }
            });
            arrayList.add(CoordinatorRecordHelpers.newEmptyGroupMetadataRecord(orMaybeCreateClassicGroup, this.metadataImage.features().metadataVersion()));
            return new CoordinatorResult<>(arrayList, completableFuture2, false);
        } catch (Throwable th2) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.forException(th2).code()));
            return EMPTY_RESULT;
        }
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybeCompleteJoinPhase(ClassicGroup classicGroup) {
        return (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE) && classicGroup.hasAllMembersJoined() && classicGroup.previousState() != ClassicGroupState.EMPTY) ? completeClassicGroupJoin(classicGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, CompletableFuture<JoinGroupResponseData> completableFuture) {
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        } else {
            if (classicGroup.supportsProtocols(joinGroupRequestData.protocolType(), joinGroupRequestData.protocols())) {
                Optional<String> ofNullable = Optional.ofNullable(joinGroupRequestData.groupInstanceId());
                String generateMemberId = classicGroup.generateMemberId(requestContext.clientId(), ofNullable);
                return ofNullable.isPresent() ? classicGroupJoinNewStaticMember(requestContext, joinGroupRequestData, classicGroup, generateMemberId, completableFuture) : classicGroupJoinNewDynamicMember(requestContext, joinGroupRequestData, classicGroup, generateMemberId, completableFuture);
            }
            completableFuture.complete(new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewStaticMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        String staticMemberId = classicGroup.staticMemberId(groupInstanceId);
        if (staticMemberId != null) {
            this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Replacing previously mapped member {} with this groupInstanceId.", new Object[]{groupInstanceId, classicGroup.groupId(), classicGroup.currentState(), staticMemberId});
            return updateStaticMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, classicGroup, staticMemberId, str, completableFuture);
        }
        this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Created a new member id {} for this member and added to the group.", new Object[]{groupInstanceId, classicGroup.groupId(), classicGroup.currentState(), str});
        return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, classicGroup, str, completableFuture);
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewDynamicMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        if (!JoinGroupRequest.requiresKnownMemberId(requestContext.apiVersion())) {
            this.log.info("Dynamic member with unknown member id joins group {} in state {}. Created a new member id {} and added the member to the group.", new Object[]{classicGroup.groupId(), classicGroup.currentState(), str});
            return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, classicGroup, str, completableFuture);
        }
        this.log.info("Dynamic member with unknown member id joins group {} in {} state. Created a new member id {} and requesting the member to rejoin with this id.", new Object[]{classicGroup.groupId(), classicGroup.currentState(), str});
        classicGroup.addPendingMember(str);
        this.timer.schedule(classicGroupHeartbeatKey(classicGroup.groupId(), str), joinGroupRequestData.sessionTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return expireClassicGroupMemberHeartbeat(classicGroup.groupId(), str);
        });
        completableFuture.complete(new JoinGroupResponseData().setMemberId(str).setErrorCode(Errors.MEMBER_ID_REQUIRED.code()));
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinExistingMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String memberId = joinGroupRequestData.memberId();
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        } else if (!classicGroup.supportsProtocols(joinGroupRequestData.protocolType(), joinGroupRequestData.protocols())) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
        } else {
            if (classicGroup.isPendingMember(memberId)) {
                if (groupInstanceId != null) {
                    throw new IllegalStateException("Received unexpected JoinGroup with groupInstanceId=" + groupInstanceId + " for pending member with memberId=" + memberId);
                }
                this.log.debug("Pending dynamic member with id {} joins group {} in {} state. Adding to the group now.", new Object[]{memberId, classicGroup.groupId(), classicGroup.currentState()});
                return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, classicGroup, memberId, completableFuture);
            }
            try {
                classicGroup.validateMember(memberId, groupInstanceId, "join-group");
                ClassicGroupMember member = classicGroup.member(memberId);
                if (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
                    return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, classicGroup, member, "Member " + member.memberId() + " is joining group during " + classicGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                }
                if (classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
                    if (!member.matches(joinGroupRequestData.protocols())) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, classicGroup, member, "Updating metadata for member " + memberId + " during " + classicGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    completableFuture.complete(new JoinGroupResponseData().setMembers(classicGroup.isLeader(memberId) ? classicGroup.currentClassicGroupMembers() : Collections.emptyList()).setMemberId(memberId).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(classicGroup.leaderOrNull()).setSkipAssignment(false));
                } else if (!classicGroup.isInState(ClassicGroupState.STABLE)) {
                    this.log.warn("Attempt to add rejoining member {} of group {} in unexpected group state {}", new Object[]{memberId, classicGroup.groupId(), classicGroup.stateAsString()});
                    completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
                } else {
                    if (classicGroup.isLeader(memberId)) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, classicGroup, member, "Leader " + memberId + " re-joining group during " + classicGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    if (!member.matches(joinGroupRequestData.protocols())) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, classicGroup, member, "Updating metadata for member " + memberId + " during " + classicGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    completableFuture.complete(new JoinGroupResponseData().setMembers(Collections.emptyList()).setMemberId(memberId).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(classicGroup.leaderOrNull()).setSkipAssignment(false));
                }
            } catch (KafkaException e) {
                completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.forException(e).code()).setProtocolType((String) null).setProtocolName((String) null));
                return EMPTY_RESULT;
            }
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> completeClassicGroupJoin(String str) {
        try {
            return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(str, false));
        } catch (UnknownMemberIdException | GroupIdNotFoundException e) {
            this.log.debug("Cannot find the group, skipping rebalance stage.", e);
            return EMPTY_RESULT;
        }
    }

    private CoordinatorResult<Void, CoordinatorRecord> completeClassicGroupJoin(ClassicGroup classicGroup) {
        this.timer.cancel(classicGroupJoinKey(classicGroup.groupId()));
        String groupId = classicGroup.groupId();
        Map map = (Map) classicGroup.notYetRejoinedMembers().entrySet().stream().filter(entry -> {
            return !((ClassicGroupMember) entry.getValue()).isStaticMember();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (!map.isEmpty()) {
            map.values().forEach(classicGroupMember -> {
                classicGroup.remove(classicGroupMember.memberId());
                this.timer.cancel(classicGroupHeartbeatKey(classicGroup.groupId(), classicGroupMember.memberId()));
            });
            this.log.info("Group {} removed dynamic members who haven't joined: {}", groupId, map.keySet());
        }
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            this.log.info("Group {} is dead, skipping rebalance stage.", groupId);
        } else {
            if (!classicGroup.maybeElectNewJoinedLeader() && !classicGroup.allMembers().isEmpty()) {
                this.log.error("Group {} could not complete rebalance because no members rejoined.", groupId);
                this.timer.schedule(classicGroupJoinKey(groupId), classicGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
                    return completeClassicGroupJoin(classicGroup.groupId());
                });
                return EMPTY_RESULT;
            }
            classicGroup.initNextGeneration();
            if (classicGroup.isInState(ClassicGroupState.EMPTY)) {
                this.log.info("Group {} with generation {} is now empty.", groupId, Integer.valueOf(classicGroup.generationId()));
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.whenComplete((r7, th) -> {
                    if (th != null) {
                        this.log.warn("Failed to write empty metadata for group {}: {}", classicGroup.groupId(), appendGroupMetadataErrorToResponseError(Errors.forException(th)).message());
                    }
                });
                return new CoordinatorResult<>(Collections.singletonList(CoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, Collections.emptyMap(), this.metadataImage.features().metadataVersion())), completableFuture, false);
            }
            this.log.info("Stabilized group {} generation {} with {} members.", new Object[]{groupId, Integer.valueOf(classicGroup.generationId()), Integer.valueOf(classicGroup.numMembers())});
            classicGroup.allMembers().forEach(classicGroupMember2 -> {
                List<JoinGroupResponseData.JoinGroupResponseMember> emptyList = Collections.emptyList();
                if (classicGroup.isLeader(classicGroupMember2.memberId())) {
                    emptyList = classicGroup.currentClassicGroupMembers();
                }
                classicGroup.completeJoinFuture(classicGroupMember2, new JoinGroupResponseData().setMembers(emptyList).setMemberId(classicGroupMember2.memberId()).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(classicGroup.leaderOrNull()).setSkipAssignment(false).setErrorCode(Errors.NONE.code()));
                rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroupMember2);
                classicGroupMember2.setIsNew(false);
                classicGroup.addPendingSyncMember(classicGroupMember2.memberId());
            });
            schedulePendingSync(classicGroup);
        }
        return EMPTY_RESULT;
    }

    private void schedulePendingSync(ClassicGroup classicGroup) {
        this.timer.schedule(classicGroupSyncKey(classicGroup.groupId()), classicGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return expirePendingSync(classicGroup.groupId(), classicGroup.generationId());
        });
    }

    private CoordinatorResult<Void, CoordinatorRecord> expireClassicGroupMemberHeartbeat(String str, String str2) {
        try {
            ClassicGroup orMaybeCreateClassicGroup = getOrMaybeCreateClassicGroup(str, false);
            if (orMaybeCreateClassicGroup.isInState(ClassicGroupState.DEAD)) {
                this.log.info("Received notification of heartbeat expiration for member {} after group {} had already been unloaded or deleted.", str2, orMaybeCreateClassicGroup.groupId());
            } else {
                if (orMaybeCreateClassicGroup.isPendingMember(str2)) {
                    this.log.info("Pending member {} in group {} has been removed after session timeout expiration.", str2, orMaybeCreateClassicGroup.groupId());
                    return removePendingMemberAndUpdateClassicGroup(orMaybeCreateClassicGroup, str2);
                }
                if (orMaybeCreateClassicGroup.hasMember(str2)) {
                    ClassicGroupMember member = orMaybeCreateClassicGroup.member(str2);
                    if (!member.hasSatisfiedHeartbeat()) {
                        this.log.info("Member {} in group {} has failed, removing it from the group.", member.memberId(), orMaybeCreateClassicGroup.groupId());
                        return removeMemberAndUpdateClassicGroup(orMaybeCreateClassicGroup, member, "removing member " + member.memberId() + " on heartbeat expiration.");
                    }
                } else {
                    this.log.debug("Member {} has already been removed from the group.", str2);
                }
            }
            return EMPTY_RESULT;
        } catch (UnknownMemberIdException | GroupIdNotFoundException e) {
            this.log.debug("Received notification of heartbeat expiration for member {} after group {} had already been deleted or upgraded.", str2, str);
            return EMPTY_RESULT;
        }
    }

    private CoordinatorResult<Void, CoordinatorRecord> removeMemberAndUpdateClassicGroup(ClassicGroup classicGroup, ClassicGroupMember classicGroupMember, String str) {
        classicGroup.completeJoinFuture(classicGroupMember, new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        classicGroup.remove(classicGroupMember.memberId());
        return (classicGroup.isInState(ClassicGroupState.STABLE) || classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) ? maybePrepareRebalanceOrCompleteJoin(classicGroup, str) : (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE) && classicGroup.hasAllMembersJoined()) ? completeClassicGroupJoin(classicGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> removePendingMemberAndUpdateClassicGroup(ClassicGroup classicGroup, String str) {
        classicGroup.remove(str);
        return (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE) && classicGroup.hasAllMembersJoined()) ? completeClassicGroupJoin(classicGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> updateMemberThenRebalanceOrCompleteJoin(JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, ClassicGroupMember classicGroupMember, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        classicGroup.updateMember(classicGroupMember, joinGroupRequestData.protocols(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), completableFuture);
        return maybePrepareRebalanceOrCompleteJoin(classicGroup, str);
    }

    private CoordinatorResult<Void, CoordinatorRecord> addMemberThenRebalanceOrCompleteJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        ClassicGroupMember classicGroupMember = new ClassicGroupMember(str, Optional.ofNullable(joinGroupRequestData.groupInstanceId()), requestContext.clientId(), requestContext.clientAddress().toString(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), joinGroupRequestData.protocolType(), joinGroupRequestData.protocols());
        classicGroupMember.setIsNew(true);
        if (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE) && classicGroup.previousState() == ClassicGroupState.EMPTY) {
            classicGroup.setNewMemberAdded(true);
        }
        classicGroup.add(classicGroupMember, completableFuture);
        rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroupMember, this.classicGroupNewMemberJoinTimeoutMs);
        return maybePrepareRebalanceOrCompleteJoin(classicGroup, "Adding new member " + str + " with group instance id " + joinGroupRequestData.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybePrepareRebalanceOrCompleteJoin(ClassicGroup classicGroup, String str) {
        return classicGroup.canRebalance() ? prepareRebalance(classicGroup, str) : maybeCompleteJoinPhase(classicGroup);
    }

    CoordinatorResult<Void, CoordinatorRecord> prepareRebalance(ClassicGroup classicGroup, String str) {
        if (classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            resetAndPropagateAssignmentWithError(classicGroup, Errors.REBALANCE_IN_PROGRESS);
        }
        removeSyncExpiration(classicGroup);
        boolean isInState = classicGroup.isInState(ClassicGroupState.EMPTY);
        if (isInState) {
            int i = this.classicGroupInitialRebalanceDelayMs;
            int max = Math.max(classicGroup.rebalanceTimeoutMs() - this.classicGroupInitialRebalanceDelayMs, 0);
            this.timer.schedule(classicGroupJoinKey(classicGroup.groupId()), i, TimeUnit.MILLISECONDS, false, () -> {
                return tryCompleteInitialRebalanceElseSchedule(classicGroup.groupId(), i, max);
            });
        }
        classicGroup.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        this.log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).", new Object[]{classicGroup.groupId(), classicGroup.currentState(), Integer.valueOf(classicGroup.generationId()), str});
        return isInState ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(classicGroup);
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybeCompleteJoinElseSchedule(ClassicGroup classicGroup) {
        String classicGroupJoinKey = classicGroupJoinKey(classicGroup.groupId());
        if (classicGroup.hasAllMembersJoined()) {
            return completeClassicGroupJoin(classicGroup);
        }
        this.timer.schedule(classicGroupJoinKey, classicGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return completeClassicGroupJoin(classicGroup.groupId());
        });
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> tryCompleteInitialRebalanceElseSchedule(String str, int i, int i2) {
        try {
            ClassicGroup orMaybeCreateClassicGroup = getOrMaybeCreateClassicGroup(str, false);
            if (!orMaybeCreateClassicGroup.newMemberAdded() || i2 == 0) {
                return completeClassicGroupJoin(orMaybeCreateClassicGroup);
            }
            orMaybeCreateClassicGroup.setNewMemberAdded(false);
            int min = Math.min(this.classicGroupInitialRebalanceDelayMs, i2);
            int max = Math.max(i2 - i, 0);
            this.timer.schedule(classicGroupJoinKey(orMaybeCreateClassicGroup.groupId()), min, TimeUnit.MILLISECONDS, false, () -> {
                return tryCompleteInitialRebalanceElseSchedule(orMaybeCreateClassicGroup.groupId(), min, max);
            });
            return EMPTY_RESULT;
        } catch (UnknownMemberIdException | GroupIdNotFoundException e) {
            this.log.debug("Cannot find the group, skipping the initial rebalance stage.", e);
            return EMPTY_RESULT;
        }
    }

    private void resetAndPropagateAssignmentWithError(ClassicGroup classicGroup, Errors errors) {
        if (!classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("Group " + classicGroup.groupId() + " must be in " + ClassicGroupState.COMPLETING_REBALANCE.name() + " state but is in " + classicGroup.currentState() + ".");
        }
        classicGroup.allMembers().forEach(classicGroupMember -> {
            classicGroupMember.setAssignment(ClassicGroupMember.EMPTY_ASSIGNMENT);
        });
        propagateAssignment(classicGroup, errors);
    }

    private void setAndPropagateAssignment(ClassicGroup classicGroup, Map<String, byte[]> map) {
        if (!classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("The group must be in CompletingRebalance state to set and propagate assignment.");
        }
        classicGroup.allMembers().forEach(classicGroupMember -> {
            classicGroupMember.setAssignment((byte[]) map.getOrDefault(classicGroupMember.memberId(), ClassicGroupMember.EMPTY_ASSIGNMENT));
        });
        propagateAssignment(classicGroup, Errors.NONE);
    }

    private void propagateAssignment(ClassicGroup classicGroup, Errors errors) {
        Optional<String> empty = Optional.empty();
        Optional<String> empty2 = Optional.empty();
        if (errors == Errors.NONE) {
            empty = classicGroup.protocolName();
            empty2 = classicGroup.protocolType();
        }
        for (ClassicGroupMember classicGroupMember : classicGroup.allMembers()) {
            if (!classicGroupMember.hasAssignment() && errors == Errors.NONE) {
                this.log.warn("Sending empty assignment to member {} of {} for generation {} with no errors", new Object[]{classicGroupMember.memberId(), classicGroup.groupId(), Integer.valueOf(classicGroup.generationId())});
            }
            if (classicGroup.completeSyncFuture(classicGroupMember, new SyncGroupResponseData().setProtocolName(empty.orElse(null)).setProtocolType(empty2.orElse(null)).setAssignment(classicGroupMember.assignment()).setErrorCode(errors.code()))) {
                rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroupMember);
            }
        }
    }

    public void rescheduleClassicGroupMemberHeartbeat(ClassicGroup classicGroup, ClassicGroupMember classicGroupMember) {
        rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroupMember, classicGroupMember.sessionTimeoutMs());
    }

    private void rescheduleClassicGroupMemberHeartbeat(ClassicGroup classicGroup, ClassicGroupMember classicGroupMember, long j) {
        this.timer.schedule(classicGroupHeartbeatKey(classicGroup.groupId(), classicGroupMember.memberId()), j, TimeUnit.MILLISECONDS, false, () -> {
            return expireClassicGroupMemberHeartbeat(classicGroup.groupId(), classicGroupMember.memberId());
        });
    }

    private void removeSyncExpiration(ClassicGroup classicGroup) {
        classicGroup.clearPendingSyncMembers();
        this.timer.cancel(classicGroupSyncKey(classicGroup.groupId()));
    }

    private CoordinatorResult<Void, CoordinatorRecord> expirePendingSync(String str, int i) {
        try {
            ClassicGroup orMaybeCreateClassicGroup = getOrMaybeCreateClassicGroup(str, false);
            if (i != orMaybeCreateClassicGroup.generationId()) {
                this.log.error("Received unexpected notification of sync expiration for {} with an old generation {} while the group has {}.", new Object[]{orMaybeCreateClassicGroup.groupId(), Integer.valueOf(i), Integer.valueOf(orMaybeCreateClassicGroup.generationId())});
            } else if (orMaybeCreateClassicGroup.isInState(ClassicGroupState.DEAD) || orMaybeCreateClassicGroup.isInState(ClassicGroupState.EMPTY) || orMaybeCreateClassicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
                this.log.error("Received unexpected notification of sync expiration after group {} already transitioned to {} state.", orMaybeCreateClassicGroup.groupId(), orMaybeCreateClassicGroup.stateAsString());
            } else if ((orMaybeCreateClassicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE) || orMaybeCreateClassicGroup.isInState(ClassicGroupState.STABLE)) && !orMaybeCreateClassicGroup.hasReceivedSyncFromAllMembers()) {
                HashSet hashSet = new HashSet(orMaybeCreateClassicGroup.allPendingSyncMembers());
                hashSet.forEach(str2 -> {
                    orMaybeCreateClassicGroup.remove(str2);
                    this.timer.cancel(classicGroupHeartbeatKey(orMaybeCreateClassicGroup.groupId(), str2));
                });
                this.log.debug("Group {} removed members who haven't sent their sync requests: {}", orMaybeCreateClassicGroup.groupId(), hashSet);
                return prepareRebalance(orMaybeCreateClassicGroup, "Removing " + hashSet + " on pending sync request expiration");
            }
            return EMPTY_RESULT;
        } catch (UnknownMemberIdException | GroupIdNotFoundException e) {
            this.log.debug("Received notification of sync expiration for an unknown classic group {}.", str);
            return EMPTY_RESULT;
        }
    }

    private boolean acceptJoiningMember(ClassicGroup classicGroup, String str) {
        switch (classicGroup.currentState()) {
            case EMPTY:
            case DEAD:
                return true;
            case PREPARING_REBALANCE:
                return (classicGroup.hasMember(str) && classicGroup.member(str).isAwaitingJoin()) || classicGroup.numAwaitingJoinResponse() < this.classicGroupMaxSize;
            case COMPLETING_REBALANCE:
            case STABLE:
                return classicGroup.hasMember(str) || classicGroup.numMembers() < this.classicGroupMaxSize;
            default:
                throw new IllegalStateException("Unknown group state: " + classicGroup.stateAsString());
        }
    }

    private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalanceOrCompleteJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, ClassicGroup classicGroup, String str, String str2, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String leaderOrNull = classicGroup.leaderOrNull();
        ClassicGroupMember replaceStaticMember = classicGroup.replaceStaticMember(joinGroupRequestData.groupInstanceId(), str, str2);
        rescheduleClassicGroupMemberHeartbeat(classicGroup, replaceStaticMember);
        int rebalanceTimeoutMs = replaceStaticMember.rebalanceTimeoutMs();
        int sessionTimeoutMs = replaceStaticMember.sessionTimeoutMs();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedProtocols = replaceStaticMember.supportedProtocols();
        classicGroup.updateMember(replaceStaticMember, joinGroupRequestData.protocols(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), completableFuture);
        if (!classicGroup.isInState(ClassicGroupState.STABLE)) {
            if (classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
                return prepareRebalance(classicGroup, "Updating metadata for static member " + replaceStaticMember.memberId() + " with instance id " + joinGroupRequestData.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
            }
            if (classicGroup.isInState(ClassicGroupState.EMPTY) || classicGroup.isInState(ClassicGroupState.DEAD)) {
                throw new IllegalStateException("Group " + classicGroup.groupId() + " was not supposed to be in the state " + classicGroup.stateAsString() + " when the unknown static member " + joinGroupRequestData.groupInstanceId() + " rejoins.");
            }
            return maybeCompleteJoinPhase(classicGroup);
        }
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        if (!classicGroup.protocolName().orElse(ClassicGroup.NO_LEADER).equals(classicGroup.selectProtocol())) {
            return maybePrepareRebalanceOrCompleteJoin(classicGroup, "Group's selectedProtocol will change because static member " + replaceStaticMember.memberId() + " with instance id " + groupInstanceId + " joined with change of protocol; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
        }
        this.log.info("Static member which joins during Stable stage and doesn't affect the selected protocol will not trigger a rebalance.");
        Map<String, byte[]> groupAssignment = classicGroup.groupAssignment();
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.whenComplete((r19, th) -> {
            if (th != null) {
                this.log.warn("Failed to persist metadata for group {} static member {} with group instance id {} due to {}. Reverting to old member id {}.", new Object[]{classicGroup.groupId(), str2, groupInstanceId, th.getMessage(), str});
                classicGroup.updateMember(replaceStaticMember, supportedProtocols, rebalanceTimeoutMs, sessionTimeoutMs, null);
                rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroup.replaceStaticMember(groupInstanceId, str2, str));
                completableFuture.complete(new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(leaderOrNull).setSkipAssignment(false).setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(th)).code()));
                return;
            }
            if (!JoinGroupRequest.supportsSkippingAssignment(requestContext.apiVersion())) {
                classicGroup.completeJoinFuture(replaceStaticMember, new JoinGroupResponseData().setMemberId(str2).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(leaderOrNull).setSkipAssignment(false));
            } else {
                boolean isLeader = classicGroup.isLeader(str2);
                classicGroup.completeJoinFuture(replaceStaticMember, new JoinGroupResponseData().setMembers(isLeader ? classicGroup.currentClassicGroupMembers() : Collections.emptyList()).setMemberId(str2).setGenerationId(classicGroup.generationId()).setProtocolName(classicGroup.protocolName().orElse(null)).setProtocolType(classicGroup.protocolType().orElse(null)).setLeader(classicGroup.leaderOrNull()).setSkipAssignment(isLeader));
            }
        });
        return new CoordinatorResult<>(Collections.singletonList(CoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, groupAssignment, this.metadataImage.features().metadataVersion())), completableFuture2, false);
    }

    public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, CompletableFuture<SyncGroupResponseData> completableFuture) throws UnknownMemberIdException {
        try {
            Group group = group(syncGroupRequestData.groupId());
            if (!group.isEmpty()) {
                return group.type() == Group.GroupType.CLASSIC ? classicGroupSyncToClassicGroup((ClassicGroup) group, requestContext, syncGroupRequestData, completableFuture) : classicGroupSyncToConsumerGroup((ConsumerGroup) group, requestContext, syncGroupRequestData, completableFuture);
            }
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            return EMPTY_RESULT;
        } catch (GroupIdNotFoundException e) {
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            return EMPTY_RESULT;
        }
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToClassicGroup(ClassicGroup classicGroup, RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, CompletableFuture<SyncGroupResponseData> completableFuture) throws IllegalStateException {
        String groupId = syncGroupRequestData.groupId();
        String memberId = syncGroupRequestData.memberId();
        Optional<Errors> validateSyncGroup = validateSyncGroup(classicGroup, syncGroupRequestData);
        if (validateSyncGroup.isPresent()) {
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(validateSyncGroup.get().code()));
        } else if (classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
        } else if (classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            classicGroup.member(memberId).setAwaitingSyncFuture(completableFuture);
            removePendingSyncMember(classicGroup, syncGroupRequestData.memberId());
            if (classicGroup.isLeader(memberId)) {
                this.log.info("Assignment received from leader {} for group {} for generation {}. The group has {} members, {} of which are static.", new Object[]{memberId, groupId, Integer.valueOf(classicGroup.generationId()), Integer.valueOf(classicGroup.numMembers()), Integer.valueOf(classicGroup.allStaticMemberIds().size())});
                HashMap hashMap = new HashMap();
                syncGroupRequestData.assignments().forEach(syncGroupRequestAssignment -> {
                });
                HashMap hashMap2 = new HashMap();
                classicGroup.allMembers().forEach(classicGroupMember -> {
                    if (hashMap.containsKey(classicGroupMember.memberId())) {
                        return;
                    }
                    hashMap2.put(classicGroupMember.memberId(), ClassicGroupMember.EMPTY_ASSIGNMENT);
                });
                hashMap.putAll(hashMap2);
                if (!hashMap2.isEmpty()) {
                    this.log.warn("Setting empty assignments for members {} of {} for generation {}.", new Object[]{hashMap2, groupId, Integer.valueOf(classicGroup.generationId())});
                }
                CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture2.whenComplete((r10, th) -> {
                    if (classicGroup.isInState(ClassicGroupState.COMPLETING_REBALANCE) && syncGroupRequestData.generationId() == classicGroup.generationId()) {
                        if (th != null) {
                            Errors appendGroupMetadataErrorToResponseError = appendGroupMetadataErrorToResponseError(Errors.forException(th));
                            resetAndPropagateAssignmentWithError(classicGroup, appendGroupMetadataErrorToResponseError);
                            maybePrepareRebalanceOrCompleteJoin(classicGroup, "Error " + appendGroupMetadataErrorToResponseError + " when storing group assignmentduring SyncGroup (member: " + memberId + ").");
                        } else {
                            setAndPropagateAssignment(classicGroup, hashMap);
                            classicGroup.transitionTo(ClassicGroupState.STABLE);
                            this.metrics.record(GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
                        }
                    }
                });
                return new CoordinatorResult<>(Collections.singletonList(CoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, hashMap, this.metadataImage.features().metadataVersion())), completableFuture2, false);
            }
        } else if (classicGroup.isInState(ClassicGroupState.STABLE)) {
            removePendingSyncMember(classicGroup, memberId);
            completableFuture.complete(new SyncGroupResponseData().setProtocolType(classicGroup.protocolType().orElse(null)).setProtocolName(classicGroup.protocolName().orElse(null)).setAssignment(classicGroup.member(memberId).assignment()).setErrorCode(Errors.NONE.code()));
        } else if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId);
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToConsumerGroup(ConsumerGroup consumerGroup, RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, CompletableFuture<SyncGroupResponseData> completableFuture) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException {
        String groupId = syncGroupRequestData.groupId();
        String memberId = syncGroupRequestData.memberId();
        ConsumerGroupMember validateConsumerGroupMember = validateConsumerGroupMember(consumerGroup, memberId, syncGroupRequestData.groupInstanceId());
        throwIfMemberDoesNotUseClassicProtocol(validateConsumerGroupMember);
        throwIfGenerationIdUnmatched(validateConsumerGroupMember.memberId(), validateConsumerGroupMember.memberEpoch(), syncGroupRequestData.generationId());
        throwIfClassicProtocolUnmatched(validateConsumerGroupMember, syncGroupRequestData.protocolType(), syncGroupRequestData.protocolName());
        throwIfRebalanceInProgress(consumerGroup, validateConsumerGroupMember);
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.whenComplete((r11, th) -> {
            if (th == null) {
                cancelConsumerGroupSyncTimeout(groupId, memberId);
                scheduleConsumerGroupSessionTimeout(groupId, memberId, validateConsumerGroupMember.classicProtocolSessionTimeout().get().intValue());
                completableFuture.complete(new SyncGroupResponseData().setProtocolType(syncGroupRequestData.protocolType()).setProtocolName(syncGroupRequestData.protocolName()).setAssignment(prepareAssignment(validateConsumerGroupMember)));
            }
        });
        return new CoordinatorResult<>(Collections.emptyList(), completableFuture2, false);
    }

    private byte[] prepareAssignment(ConsumerGroupMember consumerGroupMember) {
        try {
            return ConsumerProtocol.serializeAssignment(Utils.toConsumerProtocolAssignment(consumerGroupMember.assignedPartitions(), this.metadataImage.topics()), ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(consumerGroupMember.classicMemberMetadata().get().supportedProtocols().iterator().next().metadata()))).array();
        } catch (SchemaException e) {
            throw new IllegalStateException("Malformed embedded consumer protocol in version deserialization.");
        }
    }

    static Errors appendGroupMetadataErrorToResponseError(Errors errors) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
            case 1:
            case 2:
            case 3:
                return Errors.COORDINATOR_NOT_AVAILABLE;
            case 4:
            case 5:
                return Errors.NOT_COORDINATOR;
            case 6:
            case 7:
            case 8:
                return Errors.UNKNOWN_SERVER_ERROR;
            default:
                return errors;
        }
    }

    private Optional<Errors> validateSyncGroup(ClassicGroup classicGroup, SyncGroupRequestData syncGroupRequestData) {
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE);
        }
        try {
            classicGroup.validateMember(syncGroupRequestData.memberId(), syncGroupRequestData.groupInstanceId(), "sync-group");
            return syncGroupRequestData.generationId() != classicGroup.generationId() ? Optional.of(Errors.ILLEGAL_GENERATION) : (isProtocolInconsistent(syncGroupRequestData.protocolType(), classicGroup.protocolType().orElse(null)) || isProtocolInconsistent(syncGroupRequestData.protocolName(), classicGroup.protocolName().orElse(null))) ? Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL) : Optional.empty();
        } catch (KafkaException e) {
            return Optional.of(Errors.forException(e));
        }
    }

    private void removePendingSyncMember(ClassicGroup classicGroup, String str) {
        classicGroup.removePendingSyncMember(str);
        String classicGroupSyncKey = classicGroupSyncKey(classicGroup.groupId());
        switch (classicGroup.currentState()) {
            case EMPTY:
            case DEAD:
            case PREPARING_REBALANCE:
                this.timer.cancel(classicGroupSyncKey);
                return;
            case COMPLETING_REBALANCE:
            case STABLE:
                if (classicGroup.hasReceivedSyncFromAllMembers()) {
                    this.timer.cancel(classicGroupSyncKey);
                    return;
                }
                return;
            default:
                throw new IllegalStateException("Unknown group state: " + classicGroup.stateAsString());
        }
    }

    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) {
        try {
            Group group = group(heartbeatRequestData.groupId());
            return group.type() == Group.GroupType.CLASSIC ? classicGroupHeartbeatToClassicGroup((ClassicGroup) group, requestContext, heartbeatRequestData) : classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, requestContext, heartbeatRequestData);
        } catch (GroupIdNotFoundException e) {
            throw new UnknownMemberIdException(String.format("Group %s not found.", heartbeatRequestData.groupId()));
        }
    }

    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToClassicGroup(ClassicGroup classicGroup, RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) {
        validateClassicGroupHeartbeat(classicGroup, heartbeatRequestData.memberId(), heartbeatRequestData.groupInstanceId(), heartbeatRequestData.generationId());
        switch (classicGroup.currentState()) {
            case EMPTY:
                return new CoordinatorResult<>(Collections.emptyList(), new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            case DEAD:
            default:
                throw new IllegalStateException("Reached unexpected state " + classicGroup.currentState() + " for group " + classicGroup.groupId());
            case PREPARING_REBALANCE:
                rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroup.member(heartbeatRequestData.memberId()));
                return new CoordinatorResult<>(Collections.emptyList(), new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
            case COMPLETING_REBALANCE:
            case STABLE:
                rescheduleClassicGroupMemberHeartbeat(classicGroup, classicGroup.member(heartbeatRequestData.memberId()));
                return new CoordinatorResult<>(Collections.emptyList(), new HeartbeatResponseData());
        }
    }

    private void validateClassicGroupHeartbeat(ClassicGroup classicGroup, String str, String str2, int i) throws CoordinatorNotAvailableException, IllegalGenerationException {
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
        classicGroup.validateMember(str, str2, "heartbeat");
        if (i != classicGroup.generationId()) {
            throw Errors.ILLEGAL_GENERATION.exception();
        }
    }

    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup(ConsumerGroup consumerGroup, RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException {
        String groupId = heartbeatRequestData.groupId();
        String memberId = heartbeatRequestData.memberId();
        ConsumerGroupMember validateConsumerGroupMember = validateConsumerGroupMember(consumerGroup, memberId, heartbeatRequestData.groupInstanceId());
        throwIfMemberDoesNotUseClassicProtocol(validateConsumerGroupMember);
        throwIfGenerationIdUnmatched(memberId, validateConsumerGroupMember.memberEpoch(), heartbeatRequestData.generationId());
        scheduleConsumerGroupSessionTimeout(groupId, memberId, validateConsumerGroupMember.classicProtocolSessionTimeout().get().intValue());
        Errors errors = Errors.NONE;
        if (validateConsumerGroupMember.memberEpoch() < consumerGroup.groupEpoch() || validateConsumerGroupMember.state() == MemberState.UNREVOKED_PARTITIONS || (validateConsumerGroupMember.state() == MemberState.UNRELEASED_PARTITIONS && !consumerGroup.waitingOnUnreleasedPartition(validateConsumerGroupMember))) {
            errors = Errors.REBALANCE_IN_PROGRESS;
            scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, validateConsumerGroupMember.rebalanceTimeoutMs());
        }
        return new CoordinatorResult<>(Collections.emptyList(), new HeartbeatResponseData().setErrorCode(errors.code()));
    }

    private ConsumerGroupMember validateConsumerGroupMember(ConsumerGroup consumerGroup, String str, String str2) throws UnknownMemberIdException, FencedInstanceIdException {
        ConsumerGroupMember staticMember;
        if (str2 == null) {
            staticMember = consumerGroup.getOrMaybeCreateMember(str, false);
        } else {
            staticMember = consumerGroup.staticMember(str2);
            if (staticMember == null) {
                throw new UnknownMemberIdException(String.format("Member with instance id %s is not a member of group %s.", str2, consumerGroup.groupId()));
            }
            throwIfInstanceIdIsFenced(staticMember, consumerGroup.groupId(), str, str2);
        }
        return staticMember;
    }

    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(RequestContext requestContext, LeaveGroupRequestData leaveGroupRequestData) throws UnknownMemberIdException {
        try {
            Group group = group(leaveGroupRequestData.groupId());
            return group.type() == Group.GroupType.CLASSIC ? classicGroupLeaveToClassicGroup((ClassicGroup) group, requestContext, leaveGroupRequestData) : classicGroupLeaveToConsumerGroup((ConsumerGroup) group, requestContext, leaveGroupRequestData);
        } catch (GroupIdNotFoundException e) {
            throw new UnknownMemberIdException(String.format("Group %s not found.", leaveGroupRequestData.groupId()));
        }
    }

    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToConsumerGroup(ConsumerGroup consumerGroup, RequestContext requestContext, LeaveGroupRequestData leaveGroupRequestData) throws UnknownMemberIdException {
        ConsumerGroupMember orMaybeCreateMember;
        String groupId = consumerGroup.groupId();
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        ArrayList arrayList2 = new ArrayList();
        for (LeaveGroupRequestData.MemberIdentity memberIdentity : leaveGroupRequestData.members()) {
            String memberId = memberIdentity.memberId();
            String groupInstanceId = memberIdentity.groupInstanceId();
            String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
            if (groupInstanceId == null) {
                try {
                    orMaybeCreateMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
                    throwIfMemberDoesNotUseClassicProtocol(orMaybeCreateMember);
                    this.log.info("[Group {}] Dynamic Member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{groupId, memberId, reason});
                } catch (KafkaException e) {
                    arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberId).setGroupInstanceId(groupInstanceId).setErrorCode(Errors.forException(e).code()));
                }
            } else {
                orMaybeCreateMember = consumerGroup.staticMember(groupInstanceId);
                throwIfStaticMemberIsUnknown(orMaybeCreateMember, groupInstanceId);
                if (!ClassicGroup.NO_LEADER.equals(memberId)) {
                    throwIfInstanceIdIsFenced(orMaybeCreateMember, groupId, memberId, groupInstanceId);
                }
                throwIfMemberDoesNotUseClassicProtocol(orMaybeCreateMember);
                memberId = orMaybeCreateMember.memberId();
                this.log.info("[Group {}] Static Member {} with instance id {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{groupId, memberId, groupInstanceId, reason});
            }
            removeMember(arrayList2, groupId, memberId);
            cancelTimers(groupId, memberId);
            arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberId).setGroupInstanceId(groupInstanceId));
            hashSet.add(orMaybeCreateMember);
        }
        if (!arrayList2.isEmpty()) {
            Map<String, TopicMetadata> computeSubscriptionMetadata = consumerGroup.computeSubscriptionMetadata(consumerGroup.computeSubscribedTopicNames(hashSet), this.metadataImage.topics(), this.metadataImage.cluster());
            if (!computeSubscriptionMetadata.equals(consumerGroup.subscriptionMetadata())) {
                this.log.info("[GroupId {}] Computed new subscription metadata: {}.", consumerGroup.groupId(), computeSubscriptionMetadata);
                arrayList2.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(consumerGroup.groupId(), computeSubscriptionMetadata));
            }
            arrayList2.add(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, consumerGroup.groupEpoch() + 1));
        }
        return new CoordinatorResult<>(arrayList2, new LeaveGroupResponseData().setMembers(arrayList));
    }

    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(ClassicGroup classicGroup, RequestContext requestContext, LeaveGroupRequestData leaveGroupRequestData) throws UnknownMemberIdException {
        if (classicGroup.isInState(ClassicGroupState.DEAD)) {
            return new CoordinatorResult<>(Collections.emptyList(), new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        ArrayList arrayList = new ArrayList();
        for (LeaveGroupRequestData.MemberIdentity memberIdentity : leaveGroupRequestData.members()) {
            String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
            if (ClassicGroup.NO_LEADER.equals(memberIdentity.memberId())) {
                if (memberIdentity.groupInstanceId() == null || !classicGroup.hasStaticMember(memberIdentity.groupInstanceId())) {
                    arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
                } else {
                    removeCurrentMemberFromClassicGroup(classicGroup, classicGroup.staticMemberId(memberIdentity.groupInstanceId()), reason);
                    arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()));
                }
            } else if (classicGroup.isPendingMember(memberIdentity.memberId())) {
                classicGroup.remove(memberIdentity.memberId());
                this.timer.cancel(classicGroupHeartbeatKey(classicGroup.groupId(), memberIdentity.memberId()));
                this.log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{classicGroup.groupId(), memberIdentity.memberId(), reason});
                arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()));
            } else {
                try {
                    classicGroup.validateMember(memberIdentity.memberId(), memberIdentity.groupInstanceId(), "leave-group");
                    removeCurrentMemberFromClassicGroup(classicGroup, memberIdentity.memberId(), reason);
                    arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()));
                } catch (KafkaException e) {
                    arrayList.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()).setErrorCode(Errors.forException(e).code()));
                }
            }
        }
        List list = (List) arrayList.stream().filter(memberResponse -> {
            return memberResponse.errorCode() == Errors.NONE.code();
        }).map((v0) -> {
            return v0.memberId();
        }).collect(Collectors.toList());
        String str = "explicit `LeaveGroup` request for (" + String.join(", ", list) + ") members.";
        CoordinatorResult<Void, CoordinatorRecord> coordinatorResult = EMPTY_RESULT;
        if (!list.isEmpty()) {
            switch (classicGroup.currentState()) {
                case PREPARING_REBALANCE:
                    coordinatorResult = maybeCompleteJoinPhase(classicGroup);
                    break;
                case COMPLETING_REBALANCE:
                case STABLE:
                    coordinatorResult = maybePrepareRebalanceOrCompleteJoin(classicGroup, str);
                    break;
            }
        }
        return new CoordinatorResult<>(coordinatorResult.records(), new LeaveGroupResponseData().setMembers(arrayList), coordinatorResult.appendFuture(), coordinatorResult.replayRecords());
    }

    private void removeCurrentMemberFromClassicGroup(ClassicGroup classicGroup, String str, String str2) {
        ClassicGroupMember member = classicGroup.member(str);
        this.timer.cancel(classicGroupHeartbeatKey(classicGroup.groupId(), str));
        this.log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{classicGroup.groupId(), str, str2});
        classicGroup.completeJoinFuture(member, new JoinGroupResponseData().setMemberId(ClassicGroup.NO_LEADER).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        classicGroup.remove(member.memberId());
    }

    public void createGroupTombstoneRecords(String str, List<CoordinatorRecord> list) {
        createGroupTombstoneRecords(group(str), list);
    }

    public void createGroupTombstoneRecords(Group group, List<CoordinatorRecord> list) {
        group.createGroupTombstoneRecords(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validateDeleteGroup(String str) throws ApiException {
        group(str).validateDeleteGroup();
    }

    public void maybeDeleteGroup(String str, List<CoordinatorRecord> list) {
        Group group = (Group) this.groups.get(str);
        if (group == null || !group.isEmpty()) {
            return;
        }
        createGroupTombstoneRecords(str, list);
    }

    private static boolean isEmptyClassicGroup(Group group) {
        return group != null && group.type() == Group.GroupType.CLASSIC && group.isEmpty();
    }

    private static boolean isEmptyConsumerGroup(Group group) {
        return group != null && group.type() == Group.GroupType.CONSUMER && group.isEmpty();
    }

    private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord> list) {
        if (!isEmptyClassicGroup(group)) {
            return false;
        }
        if (group == null) {
            return true;
        }
        createGroupTombstoneRecords(group, list);
        return true;
    }

    private void maybeDeleteEmptyConsumerGroup(String str, List<CoordinatorRecord> list) {
        Group group = (Group) this.groups.get(str, Long.MAX_VALUE);
        if (isEmptyConsumerGroup(group)) {
            createGroupTombstoneRecords(group, list);
            removeGroup(str);
        }
    }

    private boolean isProtocolInconsistent(String str, String str2) {
        return (str == null || str2 == null || str2.equals(str)) ? false : true;
    }

    public Set<String> groupIds() {
        return Collections.unmodifiableSet(this.groups.keySet());
    }

    static String classicGroupHeartbeatKey(String str, String str2) {
        return "heartbeat-" + str + "-" + str2;
    }

    static String classicGroupJoinKey(String str) {
        return "join-" + str;
    }

    static String classicGroupSyncKey(String str) {
        return "sync-" + str;
    }

    static String consumerGroupJoinKey(String str, String str2) {
        return "join-" + str + "-" + str2;
    }

    static String consumerGroupSyncKey(String str, String str2) {
        return "sync-" + str + "-" + str2;
    }

    /* synthetic */ GroupMetadataManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, CoordinatorTimer coordinatorTimer, GroupCoordinatorMetricsShard groupCoordinatorMetricsShard, List list, MetadataImage metadataImage, int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9, ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy, ShareGroupPartitionAssignor shareGroupPartitionAssignor, int i10, int i11, int i12, int i13, AnonymousClass1 anonymousClass1) {
        this(snapshotRegistry, logContext, time, coordinatorTimer, groupCoordinatorMetricsShard, list, metadataImage, i, i2, i3, i4, i5, i6, i7, i8, i9, consumerGroupMigrationPolicy, shareGroupPartitionAssignor, i10, i11, i12, i13);
    }
}
