package org.apache.kafka.coordinator.group;

import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorService.class */
public class GroupCoordinatorService implements GroupCoordinator {
    private final Logger log;
    private final GroupCoordinatorConfig config;
    private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private volatile int numPartitions = -1;

    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorService$Builder.class */
    public static class Builder {
        private final int nodeId;
        private final GroupCoordinatorConfig config;
        private PartitionWriter<Record> writer;
        private CoordinatorLoader<Record> loader;
        private Time time;
        private Timer timer;

        public Builder(int i, GroupCoordinatorConfig groupCoordinatorConfig) {
            this.nodeId = i;
            this.config = groupCoordinatorConfig;
        }

        public Builder withWriter(PartitionWriter<Record> partitionWriter) {
            this.writer = partitionWriter;
            return this;
        }

        public Builder withLoader(CoordinatorLoader<Record> coordinatorLoader) {
            this.loader = coordinatorLoader;
            return this;
        }

        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public GroupCoordinatorService build() {
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.writer == null) {
                throw new IllegalArgumentException("Writer must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            String format = String.format("GroupCoordinator id=%d", Integer.valueOf(this.nodeId));
            LogContext logContext = new LogContext(String.format("[%s] ", format));
            return new GroupCoordinatorService(logContext, this.config, new CoordinatorRuntime.Builder().withTime(this.time).withTimer(this.timer).withLogPrefix(format).withLogContext(logContext).withEventProcessor(new MultiThreadedEventProcessor(logContext, "group-coordinator-event-processor-", this.config.numThreads)).withPartitionWriter(this.writer).withLoader(this.loader).withCoordinatorShardBuilderSupplier(() -> {
                return new GroupCoordinatorShard.Builder(this.config);
            }).withTime(this.time).build());
        }
    }

    GroupCoordinatorService(LogContext logContext, GroupCoordinatorConfig groupCoordinatorConfig, CoordinatorRuntime<GroupCoordinatorShard, Record> coordinatorRuntime) {
        this.log = logContext.logger(CoordinatorLoader.class);
        this.config = groupCoordinatorConfig;
        this.runtime = coordinatorRuntime;
    }

    private void throwIfNotActive() {
        if (!this.isActive.get()) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
    }

    private TopicPartition topicPartitionFor(String str) {
        return new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(str));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public int partitionFor(String str) {
        throwIfNotActive();
        return org.apache.kafka.common.utils.Utils.abs(str.hashCode()) % this.numPartitions;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(RequestContext requestContext, ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : this.runtime.scheduleWriteOperation("consumer-group-heartbeat", topicPartitionFor(consumerGroupHeartbeatRequestData.groupId()), groupCoordinatorShard -> {
            return groupCoordinatorShard.consumerGroupHeartbeat(requestContext, consumerGroupHeartbeatRequestData);
        }).exceptionally(th -> {
            return ((th instanceof UnknownTopicOrPartitionException) || (th instanceof NotEnoughReplicasException)) ? new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) : ((th instanceof NotLeaderOrFollowerException) || (th instanceof KafkaStorageException)) ? new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()) : ((th instanceof RecordTooLargeException) || (th instanceof RecordBatchTooLargeException) || (th instanceof InvalidFetchSizeException)) ? new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) : new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.forException(th).code()).setErrorMessage(th.getMessage());
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<JoinGroupResponseData> joinGroup(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
        }
        CompletableFuture<JoinGroupResponseData> completableFuture = new CompletableFuture<>();
        if (isGroupIdNotEmpty(joinGroupRequestData.groupId())) {
            this.runtime.scheduleWriteOperation("generic-group-join", topicPartitionFor(joinGroupRequestData.groupId()), groupCoordinatorShard -> {
                return groupCoordinatorShard.genericGroupJoin(requestContext, joinGroupRequestData, completableFuture);
            }).exceptionally(th -> {
                if (!(th instanceof KafkaException)) {
                    this.log.error("JoinGroup request {} hit an unexpected exception: {}", joinGroupRequestData, th.getMessage());
                }
                if (completableFuture.isDone()) {
                    return null;
                }
                completableFuture.complete(new JoinGroupResponseData().setErrorCode(Errors.forException(th).code()));
                return null;
            });
            return completableFuture;
        }
        completableFuture.complete(new JoinGroupResponseData().setMemberId(joinGroupRequestData.memberId()).setErrorCode(Errors.INVALID_GROUP_ID.code()));
        return completableFuture;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<SyncGroupResponseData> syncGroup(RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
        }
        if (!isGroupIdNotEmpty(syncGroupRequestData.groupId())) {
            return CompletableFuture.completedFuture(new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        CompletableFuture<SyncGroupResponseData> completableFuture = new CompletableFuture<>();
        this.runtime.scheduleWriteOperation("generic-group-sync", topicPartitionFor(syncGroupRequestData.groupId()), groupCoordinatorShard -> {
            return groupCoordinatorShard.genericGroupSync(requestContext, syncGroupRequestData, completableFuture);
        }).exceptionally(th -> {
            if (!(th instanceof KafkaException)) {
                this.log.error("SyncGroup request {} hit an unexpected exception: {}", syncGroupRequestData, th.getMessage());
            }
            if (completableFuture.isDone()) {
                return null;
            }
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.forException(th).code()));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<HeartbeatResponseData> heartbeat(RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : !isGroupIdNotEmpty(heartbeatRequestData.groupId()) ? CompletableFuture.completedFuture(new HeartbeatResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code())) : this.runtime.scheduleReadOperation("generic-group-heartbeat", topicPartitionFor(heartbeatRequestData.groupId()), (groupCoordinatorShard, j) -> {
            return groupCoordinatorShard.genericGroupHeartbeat(requestContext, heartbeatRequestData);
        }).exceptionally(th -> {
            if (!(th instanceof KafkaException)) {
                this.log.error("Heartbeat request {} hit an unexpected exception: {}", heartbeatRequestData, th.getMessage());
            }
            return th instanceof CoordinatorLoadInProgressException ? new HeartbeatResponseData().setErrorCode(Errors.NONE.code()) : new HeartbeatResponseData().setErrorCode(Errors.forException(th).code());
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<LeaveGroupResponseData> leaveGroup(RequestContext requestContext, LeaveGroupRequestData leaveGroupRequestData) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<ListGroupsResponseData> listGroups(RequestContext requestContext, ListGroupsRequestData listGroupsRequestData) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(RequestContext requestContext, List<String> list) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(RequestContext requestContext, List<String> list, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchOffsets(RequestContext requestContext, String str, List<OffsetFetchRequestData.OffsetFetchRequestTopics> list, boolean z) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchAllOffsets(RequestContext requestContext, String str, boolean z) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetCommitResponseData> commitOffsets(RequestContext requestContext, OffsetCommitRequestData offsetCommitRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : offsetCommitRequestData.groupId() == null ? CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.INVALID_GROUP_ID)) : this.runtime.scheduleWriteOperation("commit-offset", topicPartitionFor(offsetCommitRequestData.groupId()), groupCoordinatorShard -> {
            return groupCoordinatorShard.commitOffset(requestContext, offsetCommitRequestData);
        }).exceptionally(th -> {
            return ((th instanceof UnknownTopicOrPartitionException) || (th instanceof NotEnoughReplicasException)) ? OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.COORDINATOR_NOT_AVAILABLE) : ((th instanceof NotLeaderOrFollowerException) || (th instanceof KafkaStorageException)) ? OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.NOT_COORDINATOR) : ((th instanceof RecordTooLargeException) || (th instanceof RecordBatchTooLargeException) || (th instanceof InvalidFetchSizeException)) ? OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.INVALID_COMMIT_OFFSET_SIZE) : OffsetCommitRequest.getErrorResponse(offsetCommitRequestData, Errors.forException(th));
        });
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(RequestContext requestContext, TxnOffsetCommitRequestData txnOffsetCommitRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(RequestContext requestContext, OffsetDeleteRequestData offsetDeleteRequestData, BufferSupplier bufferSupplier) {
        return !this.isActive.get() ? FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()) : FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception("This API is not implemented yet."));
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onTransactionCompleted(long j, Iterable<TopicPartition> iterable, TransactionResult transactionResult) {
        throwIfNotActive();
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onPartitionsDeleted(List<TopicPartition> list, BufferSupplier bufferSupplier) {
        throwIfNotActive();
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onElection(int i, int i2) {
        throwIfNotActive();
        this.runtime.scheduleLoadOperation(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), i2);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onResignation(int i, OptionalInt optionalInt) {
        throwIfNotActive();
        if (!optionalInt.isPresent()) {
            throw new IllegalArgumentException("The leader epoch should always be provided in KRaft.");
        }
        this.runtime.scheduleUnloadOperation(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), optionalInt.getAsInt());
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        throwIfNotActive();
        this.runtime.onNewMetadataImage(metadataImage, metadataDelta);
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public Properties groupMetadataTopicConfigs() {
        Properties properties = new Properties();
        properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(this.config.offsetsTopicSegmentBytes));
        return properties;
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void startup(IntSupplier intSupplier) {
        if (!this.isActive.compareAndSet(false, true)) {
            this.log.warn("Group coordinator is already running.");
            return;
        }
        this.log.info("Starting up.");
        this.numPartitions = intSupplier.getAsInt();
        this.isActive.set(true);
        this.log.info("Startup complete.");
    }

    @Override // org.apache.kafka.coordinator.group.GroupCoordinator
    public void shutdown() {
        if (!this.isActive.compareAndSet(true, false)) {
            this.log.warn("Group coordinator is already shutting down.");
            return;
        }
        this.log.info("Shutting down.");
        this.isActive.set(false);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.runtime, "coordinator runtime");
        this.log.info("Shutdown complete.");
    }

    private static boolean isGroupIdNotEmpty(String str) {
        return (str == null || str.isEmpty()) ? false : true;
    }
}
