package org.apache.kafka.coordinator.group;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorService.class */
public class GroupCoordinatorService implements GroupCoordinator {
    private final Logger log;
    private final GroupCoordinatorConfig config;
    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
    private final GroupCoordinatorMetrics groupCoordinatorMetrics;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private volatile int numPartitions = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.coordinator.group.GroupCoordinatorService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_SERVER_ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NETWORK_EXCEPTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_TOPIC_OR_PARTITION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_ENOUGH_REPLICAS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.REQUEST_TIMED_OUT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_LEADER_OR_FOLLOWER.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.KAFKA_STORAGE_ERROR.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.MESSAGE_TOO_LARGE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.RECORD_LIST_TOO_LARGE.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.INVALID_FETCH_SIZE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorService$Builder.class */
    public static class Builder {
        private final int nodeId;
        private final GroupCoordinatorConfig config;
        private PartitionWriter writer;
        private CoordinatorLoader<CoordinatorRecord> loader;
        private Time time;
        private Timer timer;
        private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
        private GroupCoordinatorMetrics groupCoordinatorMetrics;

        public Builder(int i, GroupCoordinatorConfig groupCoordinatorConfig) {
            this.nodeId = i;
            this.config = groupCoordinatorConfig;
        }

        public Builder withWriter(PartitionWriter partitionWriter) {
            this.writer = partitionWriter;
            return this;
        }

        public Builder withLoader(CoordinatorLoader<CoordinatorRecord> coordinatorLoader) {
            this.loader = coordinatorLoader;
            return this;
        }

        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public Builder withGroupCoordinatorMetrics(GroupCoordinatorMetrics groupCoordinatorMetrics) {
            this.groupCoordinatorMetrics = groupCoordinatorMetrics;
            return this;
        }

        public GroupCoordinatorService build() {
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.writer == null) {
                throw new IllegalArgumentException("Writer must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.coordinatorRuntimeMetrics == null) {
                throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
            }
            if (this.groupCoordinatorMetrics == null) {
                throw new IllegalArgumentException("GroupCoordinatorMetrics must be set.");
            }
            String format = String.format("GroupCoordinator id=%d", Integer.valueOf(this.nodeId));
            LogContext logContext = new LogContext(String.format("[%s] ", format));
            return new GroupCoordinatorService(logContext, this.config, new CoordinatorRuntime.Builder().withTime(this.time).withTimer(this.timer).withLogPrefix(format).withLogContext(logContext).withEventProcessor(new MultiThreadedEventProcessor(logContext, "group-coordinator-event-processor-", this.config.numThreads, this.time, this.coordinatorRuntimeMetrics)).withPartitionWriter(this.writer).withLoader(this.loader).withCoordinatorShardBuilderSupplier(() -> {
                return new GroupCoordinatorShard.Builder(this.config);
            }).withDefaultWriteTimeOut(Duration.ofMillis(this.config.offsetCommitTimeoutMs)).withCoordinatorRuntimeMetrics(this.coordinatorRuntimeMetrics).withCoordinatorMetrics(this.groupCoordinatorMetrics).withSerializer(new CoordinatorRecordSerde()).withCompression(Compression.of(this.config.compressionType).build()).withAppendLingerMs(this.config.appendLingerMs).build(), this.groupCoordinatorMetrics);
        }
    }

    GroupCoordinatorService(LogContext logContext, GroupCoordinatorConfig groupCoordinatorConfig, CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> coordinatorRuntime, GroupCoordinatorMetrics groupCoordinatorMetrics) {
        this.log = logContext.logger(GroupCoordinatorService.class);
        this.config = groupCoordinatorConfig;
        this.runtime = coordinatorRuntime;
        this.groupCoordinatorMetrics = groupCoordinatorMetrics;
    }

    private void throwIfNotActive() {
        if (!this.isActive.get()) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
    }

    private TopicPartition topicPartitionFor(String str) {
        return new TopicPartition("__consumer_offsets", partitionFor(str));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public int partitionFor(String str) {
        throwIfNotActive();
        return org.apache.kafka.common.utils.Utils.abs(str.hashCode()) % this.numPartitions;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(RequestContext requestContext, ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : this.runtime.scheduleWriteOperation("consumer-group-heartbeat", topicPartitionFor(consumerGroupHeartbeatRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.consumerGroupHeartbeat(requestContext, consumerGroupHeartbeatRequestData);
        }).exceptionally(th -> {
            return (ConsumerGroupHeartbeatResponseData) handleOperationException("consumer-group-heartbeat", consumerGroupHeartbeatRequestData, th, (errors, str) -> {
                return new ConsumerGroupHeartbeatResponseData().setErrorCode(errors.code()).setErrorMessage(str);
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<JoinGroupResponseData> joinGroup(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(joinGroupRequestData.memberId()).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!isGroupIdNotEmpty(joinGroupRequestData.groupId())) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(joinGroupRequestData.memberId()).setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        if (joinGroupRequestData.sessionTimeoutMs() < this.config.classicGroupMinSessionTimeoutMs || joinGroupRequestData.sessionTimeoutMs() > this.config.classicGroupMaxSessionTimeoutMs) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(joinGroupRequestData.memberId()).setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()));
        }
        CompletableFuture<JoinGroupResponseData> completableFuture = new CompletableFuture<>();
        this.runtime.scheduleWriteOperation("classic-group-join", topicPartitionFor(joinGroupRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.classicGroupJoin(requestContext, joinGroupRequestData, completableFuture);
        }).exceptionally(th -> {
            if (completableFuture.isDone()) {
                return null;
            }
            completableFuture.complete(handleOperationException("classic-group-join", joinGroupRequestData, th, (errors, str) -> {
                return new JoinGroupResponseData().setErrorCode(errors.code());
            }));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<SyncGroupResponseData> syncGroup(RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new SyncGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!isGroupIdNotEmpty(syncGroupRequestData.groupId())) {
            return CompletableFuture.completedFuture(new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        CompletableFuture<SyncGroupResponseData> completableFuture = new CompletableFuture<>();
        this.runtime.scheduleWriteOperation("classic-group-sync", topicPartitionFor(syncGroupRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.classicGroupSync(requestContext, syncGroupRequestData, completableFuture);
        }).exceptionally(th -> {
            if (completableFuture.isDone()) {
                return null;
            }
            completableFuture.complete(handleOperationException("classic-group-sync", syncGroupRequestData, th, (errors, str) -> {
                return new SyncGroupResponseData().setErrorCode(errors.code());
            }));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<HeartbeatResponseData> heartbeat(RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new HeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : !isGroupIdNotEmpty(heartbeatRequestData.groupId()) ? CompletableFuture.completedFuture(new HeartbeatResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code())) : this.runtime.scheduleWriteOperation("classic-group-heartbeat", topicPartitionFor(heartbeatRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.classicGroupHeartbeat(requestContext, heartbeatRequestData);
        }).exceptionally(th -> {
            return (HeartbeatResponseData) handleOperationException("classic-group-heartbeat", heartbeatRequestData, th, (errors, str) -> {
                return errors == Errors.COORDINATOR_LOAD_IN_PROGRESS ? new HeartbeatResponseData().setErrorCode(Errors.NONE.code()) : new HeartbeatResponseData().setErrorCode(errors.code());
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<LeaveGroupResponseData> leaveGroup(RequestContext requestContext, LeaveGroupRequestData leaveGroupRequestData) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : !isGroupIdNotEmpty(leaveGroupRequestData.groupId()) ? CompletableFuture.completedFuture(new LeaveGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code())) : this.runtime.scheduleWriteOperation("classic-group-leave", topicPartitionFor(leaveGroupRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.classicGroupLeave(requestContext, leaveGroupRequestData);
        }).exceptionally(th -> {
            return (LeaveGroupResponseData) handleOperationException("classic-group-leave", leaveGroupRequestData, th, (errors, str) -> {
                if (errors != Errors.UNKNOWN_MEMBER_ID) {
                    return new LeaveGroupResponseData().setErrorCode(errors.code());
                }
                return new LeaveGroupResponseData().setMembers((List) leaveGroupRequestData.members().stream().map(memberIdentity -> {
                    return new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
                }).collect(Collectors.toList()));
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<ListGroupsResponseData> listGroups(RequestContext requestContext, ListGroupsRequestData listGroupsRequestData) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new ListGroupsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : FutureUtils.combineFutures(FutureUtils.mapExceptionally(this.runtime.scheduleReadAllOperation("list-groups", (groupCoordinatorShard, j) -> {
            return groupCoordinatorShard.listGroups(listGroupsRequestData.statesFilter(), listGroupsRequestData.typesFilter(), j);
        }), th -> {
            Throwable maybeUnwrapException = Errors.maybeUnwrapException(th);
            if (maybeUnwrapException instanceof NotCoordinatorException) {
                return Collections.emptyList();
            }
            throw new CompletionException(maybeUnwrapException);
        }), ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        }).thenApply(list -> {
            return new ListGroupsResponseData().setGroups(list);
        }).exceptionally(th2 -> {
            return (ListGroupsResponseData) handleOperationException("list-groups", listGroupsRequestData, th2, (errors, str) -> {
                return new ListGroupsResponseData().setErrorCode(errors.code());
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe(RequestContext requestContext, List<String> list) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(list, Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList arrayList = new ArrayList(list.size());
        HashMap hashMap = new HashMap();
        list.forEach(str -> {
            if (isGroupIdNotEmpty(str)) {
                ((List) hashMap.computeIfAbsent(topicPartitionFor(str), topicPartition -> {
                    return new ArrayList();
                })).add(str);
            } else {
                arrayList.add(CompletableFuture.completedFuture(Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            }
        });
        hashMap.forEach((topicPartition, list2) -> {
            arrayList.add(this.runtime.scheduleReadOperation("consumer-group-describe", topicPartition, (groupCoordinatorShard, j) -> {
                return groupCoordinatorShard.consumerGroupDescribe(list, j);
            }).exceptionally(th -> {
                return (List) handleOperationException("consumer-group-describe", list2, th, (errors, str2) -> {
                    return ConsumerGroupDescribeRequest.getErrorDescribedGroupList(list2, errors);
                });
            }));
        });
        return FutureUtils.combineFutures(arrayList, ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(RequestContext requestContext, List<String> list) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DescribeGroupsRequest.getErrorDescribedGroupList(list, Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList arrayList = new ArrayList(list.size());
        HashMap hashMap = new HashMap();
        list.forEach(str -> {
            if (str == null) {
                arrayList.add(CompletableFuture.completedFuture(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            } else {
                ((List) hashMap.computeIfAbsent(topicPartitionFor(str), topicPartition -> {
                    return new ArrayList();
                })).add(str);
            }
        });
        hashMap.forEach((topicPartition, list2) -> {
            arrayList.add(this.runtime.scheduleReadOperation("describe-groups", topicPartition, (groupCoordinatorShard, j) -> {
                return groupCoordinatorShard.describeGroups(requestContext, list2, j);
            }).exceptionally(th -> {
                return (List) handleOperationException("describe-groups", list2, th, (errors, str2) -> {
                    return DescribeGroupsRequest.getErrorDescribedGroupList(list2, errors);
                });
            }));
        });
        return FutureUtils.combineFutures(arrayList, ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(RequestContext requestContext, List<String> list, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(list, Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList arrayList = new ArrayList(list.size());
        HashMap hashMap = new HashMap();
        list.forEach(str -> {
            if (str == null) {
                arrayList.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(Collections.singletonList(null), Errors.INVALID_GROUP_ID)));
            } else {
                ((List) hashMap.computeIfAbsent(topicPartitionFor(str), topicPartition -> {
                    return new ArrayList();
                })).add(str);
            }
        });
        hashMap.forEach((topicPartition, list2) -> {
            arrayList.add(this.runtime.scheduleWriteOperation("delete-groups", topicPartition, Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
                return groupCoordinatorShard.deleteGroups(requestContext, list2);
            }).exceptionally(th -> {
                return (DeleteGroupsResponseData.DeletableGroupResultCollection) handleOperationException("delete-groups", list2, th, (errors, str2) -> {
                    return DeleteGroupsRequest.getErrorResultCollection(list2, errors);
                });
            }));
        });
        return FutureUtils.combineFutures(arrayList, DeleteGroupsResponseData.DeletableGroupResultCollection::new, (deletableGroupResultCollection, deletableGroupResultCollection2) -> {
            deletableGroupResultCollection2.forEach(deletableGroupResult -> {
                deletableGroupResultCollection.add(deletableGroupResult.duplicate());
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(RequestContext requestContext, OffsetFetchRequestData.OffsetFetchRequestGroup offsetFetchRequestGroup, boolean z) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId(offsetFetchRequestGroup.groupId()).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : offsetFetchRequestGroup.groupId() == null ? CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId(offsetFetchRequestGroup.groupId()).setErrorCode(Errors.INVALID_GROUP_ID.code())) : z ? this.runtime.scheduleWriteOperation("fetch-offsets", topicPartitionFor(offsetFetchRequestGroup.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return new CoordinatorResult(Collections.emptyList(), groupCoordinatorShard.fetchOffsets(offsetFetchRequestGroup, Long.MAX_VALUE));
        }) : this.runtime.scheduleReadOperation("fetch-offsets", topicPartitionFor(offsetFetchRequestGroup.groupId()), (groupCoordinatorShard2, j) -> {
            return groupCoordinatorShard2.fetchOffsets(offsetFetchRequestGroup, j);
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(RequestContext requestContext, OffsetFetchRequestData.OffsetFetchRequestGroup offsetFetchRequestGroup, boolean z) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId(offsetFetchRequestGroup.groupId()).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : offsetFetchRequestGroup.groupId() == null ? CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId(offsetFetchRequestGroup.groupId()).setErrorCode(Errors.INVALID_GROUP_ID.code())) : z ? this.runtime.scheduleWriteOperation("fetch-all-offsets", topicPartitionFor(offsetFetchRequestGroup.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return new CoordinatorResult(Collections.emptyList(), groupCoordinatorShard.fetchAllOffsets(offsetFetchRequestGroup, Long.MAX_VALUE));
        }) : this.runtime.scheduleReadOperation("fetch-all-offsets", topicPartitionFor(offsetFetchRequestGroup.groupId()), (groupCoordinatorShard2, j) -> {
            return groupCoordinatorShard2.fetchAllOffsets(offsetFetchRequestGroup, j);
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetCommitResponseData> commitOffsets(RequestContext requestContext, OffsetCommitRequestData offsetCommitRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.COORDINATOR_NOT_AVAILABLE)) : offsetCommitRequestData.groupId() == null ? CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.INVALID_GROUP_ID)) : this.runtime.scheduleWriteOperation("commit-offset", topicPartitionFor(offsetCommitRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.commitOffset(requestContext, offsetCommitRequestData);
        }).exceptionally(th -> {
            return (OffsetCommitResponseData) handleOperationException("commit-offset", offsetCommitRequestData, th, (errors, str) -> {
                return OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, errors);
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(RequestContext requestContext, TxnOffsetCommitRequestData txnOffsetCommitRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse(txnOffsetCommitRequestData, Errors.COORDINATOR_NOT_AVAILABLE)) : !isGroupIdNotEmpty(txnOffsetCommitRequestData.groupId()) ? CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse(txnOffsetCommitRequestData, Errors.INVALID_GROUP_ID)) : this.runtime.scheduleTransactionalWriteOperation("txn-commit-offset", topicPartitionFor(txnOffsetCommitRequestData.groupId()), txnOffsetCommitRequestData.transactionalId(), txnOffsetCommitRequestData.producerId(), txnOffsetCommitRequestData.producerEpoch(), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.commitTransactionalOffset(requestContext, txnOffsetCommitRequestData);
        }, Short.valueOf(requestContext.apiVersion())).exceptionally(th -> {
            return (TxnOffsetCommitResponseData) handleOperationException("txn-commit-offset", txnOffsetCommitRequestData, th, (errors, str) -> {
                return TxnOffsetCommitRequest.getErrorResponse(txnOffsetCommitRequestData, errors);
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(RequestContext requestContext, OffsetDeleteRequestData offsetDeleteRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? CompletableFuture.completedFuture(new OffsetDeleteResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())) : !isGroupIdNotEmpty(offsetDeleteRequestData.groupId()) ? CompletableFuture.completedFuture(new OffsetDeleteResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code())) : this.runtime.scheduleWriteOperation("delete-offsets", topicPartitionFor(offsetDeleteRequestData.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.deleteOffsets(requestContext, offsetDeleteRequestData);
        }).exceptionally(th -> {
            return (OffsetDeleteResponseData) handleOperationException("delete-offsets", offsetDeleteRequestData, th, (errors, str) -> {
                return new OffsetDeleteResponseData().setErrorCode(errors.code());
            });
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<Void> completeTransaction(TopicPartition topicPartition, long j, short s, int i, TransactionResult transactionResult, Duration duration) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : !topicPartition.topic().equals("__consumer_offsets") ? FutureUtils.failedFuture(new IllegalStateException("Completing a transaction for " + topicPartition + " is not expected")) : this.runtime.scheduleTransactionCompletion("write-txn-marker", topicPartition, j, s, i, transactionResult, duration);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onTransactionCompleted(long j, Iterable<TopicPartition> iterable, TransactionResult transactionResult) {
        throwIfNotActive();
        throw new IllegalStateException("onTransactionCompleted is not supported.");
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onPartitionsDeleted(List<TopicPartition> list, BufferSupplier bufferSupplier) throws ExecutionException, InterruptedException {
        throwIfNotActive();
        CompletableFuture.allOf((CompletableFuture[]) FutureUtils.mapExceptionally(this.runtime.scheduleWriteAllOperation("on-partition-deleted", Duration.ofMillis(this.config.offsetCommitTimeoutMs), groupCoordinatorShard -> {
            return groupCoordinatorShard.onPartitionsDeleted(list);
        }), th -> {
            this.log.error("Could not delete offsets for deleted partitions {} due to: {}.", new Object[]{list, th.getMessage(), th});
            return null;
        }).toArray(new CompletableFuture[0])).get();
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onElection(int i, int i2) {
        throwIfNotActive();
        this.runtime.scheduleLoadOperation(new TopicPartition("__consumer_offsets", i), i2);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onResignation(int i, OptionalInt optionalInt) {
        throwIfNotActive();
        this.runtime.scheduleUnloadOperation(new TopicPartition("__consumer_offsets", i), optionalInt);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        throwIfNotActive();
        this.runtime.onNewMetadataImage(metadataImage, metadataDelta);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public Properties groupMetadataTopicConfigs() {
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", String.valueOf(this.config.offsetsTopicSegmentBytes));
        return properties;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void startup(IntSupplier intSupplier) {
        if (!this.isActive.compareAndSet(false, true)) {
            this.log.warn("Group coordinator is already running.");
            return;
        }
        this.log.info("Starting up.");
        this.numPartitions = intSupplier.getAsInt();
        this.isActive.set(true);
        this.log.info("Startup complete.");
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void shutdown() {
        if (!this.isActive.compareAndSet(true, false)) {
            this.log.warn("Group coordinator is already shutting down.");
            return;
        }
        this.log.info("Shutting down.");
        this.isActive.set(false);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.runtime, "coordinator runtime");
        org.apache.kafka.common.utils.Utils.closeQuietly(this.groupCoordinatorMetrics, "group coordinator metrics");
        this.log.info("Shutdown complete.");
    }

    private static boolean isGroupIdNotEmpty(String str) {
        return (str == null || str.isEmpty()) ? false : true;
    }

    private <IN, OUT> OUT handleOperationException(String str, IN in, Throwable th, BiFunction<Errors, String, OUT> biFunction) {
        ApiError fromThrowable = ApiError.fromThrowable(th);
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[fromThrowable.error().ordinal()]) {
            case 1:
                this.log.error("Operation {} with {} hit an unexpected exception: {}.", new Object[]{str, in, th.getMessage(), th});
                return biFunction.apply(Errors.UNKNOWN_SERVER_ERROR, null);
            case 2:
                return biFunction.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
            case 3:
            case 4:
            case 5:
                return biFunction.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
            case 6:
            case 7:
                return biFunction.apply(Errors.NOT_COORDINATOR, null);
            case 8:
            case 9:
            case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT /* 10 */:
                return biFunction.apply(Errors.UNKNOWN_SERVER_ERROR, null);
            default:
                return biFunction.apply(fromThrowable.error(), fromThrowable.message());
        }
    }
}
