package org.apache.kafka.coordinator.group.runtime;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.class */
public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements AutoCloseable {
    private final String logPrefix;
    private final LogContext logContext;
    private final Logger log;
    private final Time time;
    private final Timer timer;
    private final Duration defaultWriteTimeout;
    private final ConcurrentHashMap<TopicPartition, CoordinatorRuntime<S, U>.CoordinatorContext> coordinators;
    private final CoordinatorEventProcessor processor;
    private final PartitionWriter<U> partitionWriter;
    private final CoordinatorRuntime<S, U>.HighWatermarkListener highWatermarklistener;
    private final CoordinatorLoader<U> loader;
    private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
    private final CoordinatorRuntimeMetrics runtimeMetrics;
    private final CoordinatorMetrics coordinatorMetrics;
    private final AtomicBoolean isRunning;
    private volatile MetadataImage metadataImage;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState = new int[CoordinatorState.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[CoordinatorState.LOADING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[CoordinatorState.ACTIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[CoordinatorState.FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[CoordinatorState.CLOSED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[CoordinatorState.INITIAL.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$Builder.class */
    public static class Builder<S extends CoordinatorShard<U>, U> {
        private String logPrefix;
        private LogContext logContext;
        private CoordinatorEventProcessor eventProcessor;
        private PartitionWriter<U> partitionWriter;
        private CoordinatorLoader<U> loader;
        private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
        private Time time = Time.SYSTEM;
        private Timer timer;
        private Duration defaultWriteTimeout;
        private CoordinatorRuntimeMetrics runtimeMetrics;
        private CoordinatorMetrics coordinatorMetrics;

        public Builder<S, U> withLogPrefix(String str) {
            this.logPrefix = str;
            return this;
        }

        public Builder<S, U> withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder<S, U> withEventProcessor(CoordinatorEventProcessor coordinatorEventProcessor) {
            this.eventProcessor = coordinatorEventProcessor;
            return this;
        }

        public Builder<S, U> withPartitionWriter(PartitionWriter<U> partitionWriter) {
            this.partitionWriter = partitionWriter;
            return this;
        }

        public Builder<S, U> withLoader(CoordinatorLoader<U> coordinatorLoader) {
            this.loader = coordinatorLoader;
            return this;
        }

        public Builder<S, U> withCoordinatorShardBuilderSupplier(CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier) {
            this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
            return this;
        }

        public Builder<S, U> withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder<S, U> withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder<S, U> withDefaultWriteTimeOut(Duration duration) {
            this.defaultWriteTimeout = duration;
            return this;
        }

        public Builder<S, U> withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.runtimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public Builder<S, U> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            this.coordinatorMetrics = coordinatorMetrics;
            return this;
        }

        public CoordinatorRuntime<S, U> build() {
            if (this.logPrefix == null) {
                this.logPrefix = "";
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(this.logPrefix);
            }
            if (this.eventProcessor == null) {
                throw new IllegalArgumentException("Event processor must be set.");
            }
            if (this.partitionWriter == null) {
                throw new IllegalArgumentException("Partition write must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.coordinatorShardBuilderSupplier == null) {
                throw new IllegalArgumentException("State machine supplier must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.runtimeMetrics == null) {
                throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
            }
            if (this.coordinatorMetrics == null) {
                throw new IllegalArgumentException("CoordinatorMetrics must be set.");
            }
            return new CoordinatorRuntime<>(this.logPrefix, this.logContext, this.eventProcessor, this.partitionWriter, this.loader, this.coordinatorShardBuilderSupplier, this.time, this.timer, this.defaultWriteTimeout, this.runtimeMetrics, this.coordinatorMetrics, null);
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorCompleteTransactionEvent.class */
    class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredEvent {
        final TopicPartition tp;
        final String name;
        final long producerId;
        final short producerEpoch;
        final int coordinatorEpoch;
        final TransactionResult result;
        final Duration writeTimeout;
        final CompletableFuture<Void> future = new CompletableFuture<>();
        private final long createdTimeMs;

        CoordinatorCompleteTransactionEvent(String str, TopicPartition topicPartition, long j, short s, int i, TransactionResult transactionResult, Duration duration) {
            this.name = str;
            this.tp = topicPartition;
            this.producerId = j;
            this.producerEpoch = s;
            this.coordinatorEpoch = i;
            this.result = transactionResult;
            this.writeTimeout = duration;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    long lastWrittenOffset = coordinatorContext.coordinator.lastWrittenOffset();
                    try {
                        coordinatorContext.coordinator.replayEndTransactionMarker(this.producerId, this.producerEpoch, this.result);
                        long appendEndTransactionMarker = CoordinatorRuntime.this.partitionWriter.appendEndTransactionMarker(this.tp, this.producerId, this.producerEpoch, this.coordinatorEpoch, this.result);
                        coordinatorContext.coordinator.updateLastWrittenOffset(Long.valueOf(appendEndTransactionMarker));
                        if (this.future.isDone()) {
                            complete(null);
                        } else {
                            coordinatorContext.deferredEventQueue.add(appendEndTransactionMarker, this);
                            CoordinatorRuntime.this.timer.add(new TimerTask(this.writeTimeout.toMillis()) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorCompleteTransactionEvent.1
                                public void run() {
                                    if (CoordinatorCompleteTransactionEvent.this.future.isDone()) {
                                        return;
                                    }
                                    CoordinatorRuntime.this.scheduleInternalOperation("WriteTimeout(name=" + CoordinatorCompleteTransactionEvent.this.name + ", tp=" + CoordinatorCompleteTransactionEvent.this.tp + ")", CoordinatorCompleteTransactionEvent.this.tp, () -> {
                                        CoordinatorCompleteTransactionEvent.this.complete(new TimeoutException("CoordinatorCompleteTransactionEvent " + CoordinatorCompleteTransactionEvent.this.name + " timed out after " + CoordinatorCompleteTransactionEvent.this.writeTimeout.toMillis() + "ms"));
                                    });
                                }
                            });
                        }
                    } catch (Throwable th) {
                        coordinatorContext.coordinator.revertLastWrittenOffset(lastWrittenOffset);
                        complete(th);
                    }
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th == null) {
                this.future.complete(null);
            } else {
                this.future.completeExceptionally(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorCompleteTransactionEvent(name=" + this.name + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorContext.class */
    public class CoordinatorContext {
        final ReentrantLock lock;
        final TopicPartition tp;
        final LogContext logContext;
        final DeferredEventQueue deferredEventQueue;
        final CoordinatorRuntime<S, U>.EventBasedCoordinatorTimer timer;
        CoordinatorState state;
        int epoch;
        SnapshottableCoordinator<S, U> coordinator;

        private CoordinatorContext(TopicPartition topicPartition) {
            this.lock = new ReentrantLock();
            this.tp = topicPartition;
            this.logContext = new LogContext(String.format("[%s topic=%s partition=%d] ", CoordinatorRuntime.this.logPrefix, topicPartition.topic(), Integer.valueOf(topicPartition.partition())));
            this.state = CoordinatorState.INITIAL;
            this.epoch = -1;
            this.deferredEventQueue = new DeferredEventQueue(this.logContext);
            this.timer = new EventBasedCoordinatorTimer(topicPartition, this.logContext);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void transitionTo(CoordinatorState coordinatorState) {
            if (!coordinatorState.canTransitionFrom(this.state)) {
                throw new IllegalStateException("Cannot transition from " + this.state + " to " + coordinatorState);
            }
            CoordinatorState coordinatorState2 = this.state;
            CoordinatorRuntime.this.log.debug("Transition from {} to {}.", this.state, coordinatorState);
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[coordinatorState.ordinal()]) {
                case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION /* 1 */:
                    this.state = CoordinatorState.LOADING;
                    SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
                    this.coordinator = new SnapshottableCoordinator<>(this.logContext, snapshotRegistry, CoordinatorRuntime.this.coordinatorShardBuilderSupplier.get().withLogContext(this.logContext).withSnapshotRegistry(snapshotRegistry).withTime(CoordinatorRuntime.this.time).withTimer(this.timer).withCoordinatorMetrics(CoordinatorRuntime.this.coordinatorMetrics).withTopicPartition(this.tp).build(), this.tp);
                    break;
                case 2:
                    this.state = CoordinatorState.ACTIVE;
                    CoordinatorRuntime.this.partitionWriter.registerListener(this.tp, CoordinatorRuntime.this.highWatermarklistener);
                    this.coordinator.onLoaded(CoordinatorRuntime.this.metadataImage);
                    break;
                case 3:
                    this.state = CoordinatorState.FAILED;
                    unload();
                    break;
                case 4:
                    this.state = CoordinatorState.CLOSED;
                    unload();
                    break;
                default:
                    throw new IllegalArgumentException("Transitioning to " + coordinatorState + " is not supported.");
            }
            CoordinatorRuntime.this.runtimeMetrics.recordPartitionStateChange(coordinatorState2, this.state);
        }

        private void unload() {
            CoordinatorRuntime.this.partitionWriter.deregisterListener(this.tp, CoordinatorRuntime.this.highWatermarklistener);
            this.timer.cancelAll();
            this.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
            if (this.coordinator != null) {
                this.coordinator.onUnloaded();
            }
            this.coordinator = null;
        }

        /* synthetic */ CoordinatorContext(CoordinatorRuntime coordinatorRuntime, TopicPartition topicPartition, AnonymousClass1 anonymousClass1) {
            this(topicPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorInternalEvent.class */
    public class CoordinatorInternalEvent implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final Runnable op;
        private final long createdTimeMs;

        CoordinatorInternalEvent(String str, TopicPartition topicPartition, Runnable runnable) {
            this.tp = topicPartition;
            this.name = str;
            this.op = runnable;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                this.op.run();
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th != null) {
                CoordinatorRuntime.this.log.error("Execution of {} failed due to {}.", new Object[]{this.name, th.getMessage(), th});
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "InternalEvent(name=" + this.name + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorReadEvent.class */
    class CoordinatorReadEvent<T> implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final CoordinatorReadOperation<S, T> op;
        final CompletableFuture<T> future = new CompletableFuture<>();
        T response;
        private final long createdTimeMs;

        CoordinatorReadEvent(String str, TopicPartition topicPartition, CoordinatorReadOperation<S, T> coordinatorReadOperation) {
            this.tp = topicPartition;
            this.name = str;
            this.op = coordinatorReadOperation;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    this.response = (T) this.op.generateResponse(coordinatorContext.coordinator.coordinator(), coordinatorContext.coordinator.lastCommittedOffset());
                    complete(null);
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th == null) {
                this.future.complete(this.response);
            } else {
                this.future.completeExceptionally(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorReadEvent(name=" + this.name + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorReadOperation.class */
    public interface CoordinatorReadOperation<S, T> {
        T generateResponse(S s, long j) throws KafkaException;
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorState.class */
    public enum CoordinatorState {
        INITIAL { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.1
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return false;
            }
        },
        LOADING { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.2
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == INITIAL || coordinatorState == FAILED;
            }
        },
        ACTIVE { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.3
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == ACTIVE || coordinatorState == LOADING;
            }
        },
        CLOSED { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.4
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return true;
            }
        },
        FAILED { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.5
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == LOADING;
            }
        };

        abstract boolean canTransitionFrom(CoordinatorState coordinatorState);

        /* synthetic */ CoordinatorState(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorWriteEvent.class */
    class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent {
        final TopicPartition tp;
        final String name;
        final String transactionalId;
        final long producerId;
        final short producerEpoch;
        final CoordinatorWriteOperation<S, T, U> op;
        final CompletableFuture<T> future;
        final Duration writeTimeout;
        CoordinatorResult<T, U> result;
        private final long createdTimeMs;

        CoordinatorWriteEvent(CoordinatorRuntime coordinatorRuntime, String str, TopicPartition topicPartition, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
            this(str, topicPartition, null, -1L, (short) -1, duration, coordinatorWriteOperation);
        }

        CoordinatorWriteEvent(String str, TopicPartition topicPartition, String str2, long j, short s, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
            this.tp = topicPartition;
            this.name = str;
            this.op = coordinatorWriteOperation;
            this.transactionalId = str2;
            this.producerId = j;
            this.producerEpoch = s;
            this.future = new CompletableFuture<>();
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
            this.writeTimeout = duration;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    long lastWrittenOffset = coordinatorContext.coordinator.lastWrittenOffset();
                    this.result = this.op.generateRecordsAndResult(coordinatorContext.coordinator.coordinator());
                    if (this.result.records().isEmpty()) {
                        OptionalLong highestPendingOffset = coordinatorContext.deferredEventQueue.highestPendingOffset();
                        if (highestPendingOffset.isPresent()) {
                            coordinatorContext.deferredEventQueue.add(highestPendingOffset.getAsLong(), this);
                            return;
                        } else {
                            complete(null);
                            return;
                        }
                    }
                    try {
                        if (this.result.replayRecords()) {
                            this.result.records().forEach(obj -> {
                                coordinatorContext.coordinator.replay(this.producerId, this.producerEpoch, obj);
                            });
                        }
                        long append = CoordinatorRuntime.this.partitionWriter.append(this.tp, this.producerId, this.producerEpoch, this.result.records());
                        coordinatorContext.coordinator.updateLastWrittenOffset(Long.valueOf(append));
                        if (this.future.isDone()) {
                            complete(null);
                        } else {
                            coordinatorContext.deferredEventQueue.add(append, this);
                            CoordinatorRuntime.this.timer.add(new TimerTask(this.writeTimeout.toMillis()) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorWriteEvent.1
                                public void run() {
                                    if (CoordinatorWriteEvent.this.future.isDone()) {
                                        return;
                                    }
                                    CoordinatorRuntime.this.scheduleInternalOperation("WriteTimeout(name=" + CoordinatorWriteEvent.this.name + ", tp=" + CoordinatorWriteEvent.this.tp + ")", CoordinatorWriteEvent.this.tp, () -> {
                                        CoordinatorWriteEvent.this.complete(new TimeoutException("CoordinatorWriteEvent " + CoordinatorWriteEvent.this.name + " timed out after " + CoordinatorWriteEvent.this.writeTimeout.toMillis() + "ms"));
                                    });
                                }
                            });
                        }
                    } catch (Throwable th) {
                        coordinatorContext.coordinator.revertLastWrittenOffset(lastWrittenOffset);
                        complete(th);
                    }
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            CompletableFuture<Void> appendFuture = this.result != null ? this.result.appendFuture() : null;
            if (th == null) {
                if (appendFuture != null) {
                    this.result.appendFuture().complete(null);
                }
                this.future.complete(this.result.response());
            } else {
                if (appendFuture != null) {
                    this.result.appendFuture().completeExceptionally(th);
                }
                this.future.completeExceptionally(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorWriteEvent(name=" + this.name + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorWriteOperation.class */
    public interface CoordinatorWriteOperation<S, T, U> {
        CoordinatorResult<T, U> generateRecordsAndResult(S s) throws KafkaException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$EventBasedCoordinatorTimer.class */
    public class EventBasedCoordinatorTimer implements CoordinatorTimer<Void, U> {
        final Logger log;
        final TopicPartition tp;
        final Map<String, TimerTask> tasks = new HashMap();

        EventBasedCoordinatorTimer(TopicPartition topicPartition, LogContext logContext) {
            this.tp = topicPartition;
            this.log = logContext.logger(EventBasedCoordinatorTimer.class);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void schedule(String str, long j, TimeUnit timeUnit, boolean z, CoordinatorTimer.TimeoutOperation<Void, U> timeoutOperation) {
            schedule(str, j, timeUnit, z, 500L, timeoutOperation);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void schedule(final String str, long j, TimeUnit timeUnit, final boolean z, final long j2, final CoordinatorTimer.TimeoutOperation<Void, U> timeoutOperation) {
            TimerTask timerTask = new TimerTask(timeUnit.toMillis(j)) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.EventBasedCoordinatorTimer.1
                public void run() {
                    String str2 = "Timeout(tp=" + EventBasedCoordinatorTimer.this.tp + ", key=" + str + ")";
                    CoordinatorRuntime coordinatorRuntime = CoordinatorRuntime.this;
                    TopicPartition topicPartition = EventBasedCoordinatorTimer.this.tp;
                    Duration duration = CoordinatorRuntime.this.defaultWriteTimeout;
                    String str3 = str;
                    CoordinatorTimer.TimeoutOperation timeoutOperation2 = timeoutOperation;
                    CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(coordinatorRuntime, str2, topicPartition, duration, coordinatorShard -> {
                        EventBasedCoordinatorTimer.this.log.debug("Executing write event {} for timer {}.", str2, str3);
                        if (EventBasedCoordinatorTimer.this.tasks.remove(str3, this)) {
                            return timeoutOperation2.generateRecords();
                        }
                        throw new RejectedExecutionException("Timer " + str3 + " was overridden or cancelled");
                    });
                    CompletableFuture<T> completableFuture = coordinatorWriteEvent.future;
                    String str4 = str;
                    boolean z2 = z;
                    long j3 = j2;
                    CoordinatorTimer.TimeoutOperation timeoutOperation3 = timeoutOperation;
                    completableFuture.exceptionally(th -> {
                        if (th instanceof RejectedExecutionException) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} was not executed because it was cancelled or overridden.", coordinatorWriteEvent.name, str4);
                            return null;
                        }
                        if ((th instanceof NotCoordinatorException) || (th instanceof CoordinatorLoadInProgressException)) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} failed due to {}. Ignoring it because the coordinator is not active.", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                            return null;
                        }
                        if (!z2) {
                            EventBasedCoordinatorTimer.this.log.error("The write event {} for the timer {} failed due to {}. Ignoring it. ", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                            return null;
                        }
                        EventBasedCoordinatorTimer.this.log.info("The write event {} for the timer {} failed due to {}. Rescheduling it. ", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                        EventBasedCoordinatorTimer.this.schedule(str4, j3, TimeUnit.MILLISECONDS, z2, timeoutOperation3);
                        return null;
                    });
                    EventBasedCoordinatorTimer.this.log.debug("Scheduling write event {} for timer {}.", coordinatorWriteEvent.name, str);
                    try {
                        CoordinatorRuntime.this.enqueue(coordinatorWriteEvent);
                    } catch (NotCoordinatorException e) {
                        EventBasedCoordinatorTimer.this.log.info("Failed to enqueue write event {} for timer {} because the runtime is closed. Ignoring it.", coordinatorWriteEvent.name, str);
                    }
                }
            };
            this.log.debug("Registering timer {} with delay of {}ms.", str, Long.valueOf(timeUnit.toMillis(j)));
            TimerTask put = this.tasks.put(str, timerTask);
            if (put != null) {
                put.cancel();
            }
            CoordinatorRuntime.this.timer.add(timerTask);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void cancel(String str) {
            TimerTask remove = this.tasks.remove(str);
            if (remove != null) {
                remove.cancel();
            }
        }

        public void cancelAll() {
            Iterator<Map.Entry<String, TimerTask>> it = this.tasks.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().cancel();
                it.remove();
            }
        }

        public int size() {
            return this.tasks.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$HighWatermarkListener.class */
    public class HighWatermarkListener implements PartitionWriter.Listener {
        HighWatermarkListener() {
        }

        @Override // org.apache.kafka.coordinator.group.runtime.PartitionWriter.Listener
        public void onHighWatermarkUpdated(TopicPartition topicPartition, long j) {
            CoordinatorRuntime.this.log.debug("High watermark of {} incremented to {}.", topicPartition, Long.valueOf(j));
            CoordinatorRuntime.this.scheduleInternalOperation("HighWatermarkUpdated(tp=" + topicPartition + ", offset=" + j + ")", topicPartition, () -> {
                CoordinatorRuntime.this.withActiveContextOrThrow(topicPartition, coordinatorContext -> {
                    coordinatorContext.coordinator.updateLastCommittedOffset(Long.valueOf(j));
                    coordinatorContext.deferredEventQueue.completeUpTo(j);
                    CoordinatorRuntime.this.coordinatorMetrics.onUpdateLastCommittedOffset(topicPartition, j);
                });
            });
        }
    }

    private CoordinatorRuntime(String str, LogContext logContext, CoordinatorEventProcessor coordinatorEventProcessor, PartitionWriter<U> partitionWriter, CoordinatorLoader<U> coordinatorLoader, CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier, Time time, Timer timer, Duration duration, CoordinatorRuntimeMetrics coordinatorRuntimeMetrics, CoordinatorMetrics coordinatorMetrics) {
        this.isRunning = new AtomicBoolean(true);
        this.metadataImage = MetadataImage.EMPTY;
        this.logPrefix = str;
        this.logContext = logContext;
        this.log = logContext.logger(CoordinatorRuntime.class);
        this.time = time;
        this.timer = timer;
        this.defaultWriteTimeout = duration;
        this.coordinators = new ConcurrentHashMap<>();
        this.processor = coordinatorEventProcessor;
        this.partitionWriter = partitionWriter;
        this.highWatermarklistener = new HighWatermarkListener();
        this.loader = coordinatorLoader;
        this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
        this.runtimeMetrics = coordinatorRuntimeMetrics;
        this.coordinatorMetrics = coordinatorMetrics;
    }

    private void throwIfNotRunning() {
        if (!this.isRunning.get()) {
            throw Errors.NOT_COORDINATOR.exception();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueue(CoordinatorEvent coordinatorEvent) {
        try {
            this.processor.enqueue(coordinatorEvent);
        } catch (RejectedExecutionException e) {
            throw new NotCoordinatorException("Can't accept an event because the processor is closed", e);
        }
    }

    void maybeCreateContext(TopicPartition topicPartition) {
        this.coordinators.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new CoordinatorContext(this, topicPartition2, null);
        });
    }

    CoordinatorRuntime<S, U>.CoordinatorContext contextOrThrow(TopicPartition topicPartition) throws NotCoordinatorException {
        CoordinatorRuntime<S, U>.CoordinatorContext coordinatorContext = this.coordinators.get(topicPartition);
        if (coordinatorContext == null) {
            throw Errors.NOT_COORDINATOR.exception();
        }
        return coordinatorContext;
    }

    private void withContextOrThrow(TopicPartition topicPartition, Consumer<CoordinatorRuntime<S, U>.CoordinatorContext> consumer) throws NotCoordinatorException {
        CoordinatorRuntime<S, U>.CoordinatorContext contextOrThrow = contextOrThrow(topicPartition);
        try {
            contextOrThrow.lock.lock();
            consumer.accept(contextOrThrow);
            contextOrThrow.lock.unlock();
        } catch (Throwable th) {
            contextOrThrow.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void withActiveContextOrThrow(TopicPartition topicPartition, Consumer<CoordinatorRuntime<S, U>.CoordinatorContext> consumer) throws NotCoordinatorException, CoordinatorLoadInProgressException {
        CoordinatorRuntime<S, U>.CoordinatorContext contextOrThrow = contextOrThrow(topicPartition);
        try {
            contextOrThrow.lock.lock();
            if (contextOrThrow.state == CoordinatorState.ACTIVE) {
                consumer.accept(contextOrThrow);
            } else {
                if (contextOrThrow.state != CoordinatorState.LOADING) {
                    throw Errors.NOT_COORDINATOR.exception();
                }
                throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception();
            }
        } finally {
            contextOrThrow.lock.unlock();
        }
    }

    public <T> CompletableFuture<T> scheduleWriteOperation(String str, TopicPartition topicPartition, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of write operation {}.", str);
        CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(this, str, topicPartition, duration, coordinatorWriteOperation);
        enqueue(coordinatorWriteEvent);
        return coordinatorWriteEvent.future;
    }

    public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(String str, TopicPartition topicPartition, String str2, long j, short s, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of transactional write operation {}.", str);
        CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(str, topicPartition, str2, j, s, duration, coordinatorWriteOperation);
        enqueue(coordinatorWriteEvent);
        return coordinatorWriteEvent.future;
    }

    public CompletableFuture<Void> scheduleTransactionCompletion(String str, TopicPartition topicPartition, long j, short s, int i, TransactionResult transactionResult, Duration duration) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of transaction completion for {} with producer id={}, producer epoch={}, coordinator epoch={} and transaction result={}.", new Object[]{topicPartition, Long.valueOf(j), Short.valueOf(s), Integer.valueOf(i), transactionResult});
        CoordinatorCompleteTransactionEvent coordinatorCompleteTransactionEvent = new CoordinatorCompleteTransactionEvent(str, topicPartition, j, s, i, transactionResult, duration);
        enqueue(coordinatorCompleteTransactionEvent);
        return coordinatorCompleteTransactionEvent.future;
    }

    public <T> CompletableFuture<T> scheduleReadOperation(String str, TopicPartition topicPartition, CoordinatorReadOperation<S, T> coordinatorReadOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of read operation {}.", str);
        CoordinatorReadEvent coordinatorReadEvent = new CoordinatorReadEvent(str, topicPartition, coordinatorReadOperation);
        enqueue(coordinatorReadEvent);
        return coordinatorReadEvent.future;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleInternalOperation(String str, TopicPartition topicPartition, Runnable runnable) {
        this.log.debug("Scheduled execution of internal operation {}.", str);
        enqueue(new CoordinatorInternalEvent(str, topicPartition, runnable));
    }

    public Set<TopicPartition> partitions() {
        throwIfNotRunning();
        return new HashSet(this.coordinators.keySet());
    }

    public void scheduleLoadOperation(TopicPartition topicPartition, int i) {
        throwIfNotRunning();
        this.log.info("Scheduling loading of metadata from {} with epoch {}", topicPartition, Integer.valueOf(i));
        maybeCreateContext(topicPartition);
        scheduleInternalOperation("Load(tp=" + topicPartition + ", epoch=" + i + ")", topicPartition, () -> {
            withContextOrThrow(topicPartition, coordinatorContext -> {
                if (coordinatorContext.epoch >= i) {
                    this.log.info("Ignoring loading metadata from {} since current epoch {} is larger than or equals to {}.", new Object[]{coordinatorContext.tp, Integer.valueOf(coordinatorContext.epoch), Integer.valueOf(i)});
                    return;
                }
                coordinatorContext.epoch = i;
                switch (AnonymousClass1.$SwitchMap$org$apache$kafka$coordinator$group$runtime$CoordinatorRuntime$CoordinatorState[coordinatorContext.state.ordinal()]) {
                    case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION /* 1 */:
                        this.log.info("The coordinator {} is already loading metadata.", topicPartition);
                        return;
                    case 2:
                        this.log.info("The coordinator {} is already active.", topicPartition);
                        return;
                    case 3:
                    case 5:
                        coordinatorContext.transitionTo(CoordinatorState.LOADING);
                        this.loader.load(topicPartition, coordinatorContext.coordinator).whenComplete((loadSummary, th) -> {
                            scheduleInternalOperation("CompleteLoad(tp=" + topicPartition + ", epoch=" + i + ")", topicPartition, () -> {
                                withContextOrThrow(topicPartition, coordinatorContext -> {
                                    if (coordinatorContext.state != CoordinatorState.LOADING) {
                                        this.log.info("Ignoring load completion from {} because context is in {} state.", coordinatorContext.tp, coordinatorContext.state);
                                        return;
                                    }
                                    try {
                                        if (th != null) {
                                            throw th;
                                        }
                                        coordinatorContext.transitionTo(CoordinatorState.ACTIVE);
                                        if (loadSummary != null) {
                                            this.runtimeMetrics.recordPartitionLoadSensor(loadSummary.startTimeMs(), loadSummary.endTimeMs());
                                        }
                                        this.log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.", new Object[]{topicPartition, Integer.valueOf(i), loadSummary});
                                    } catch (Throwable th) {
                                        this.log.error("Failed to load metadata from {} with epoch {} due to {}.", new Object[]{topicPartition, Integer.valueOf(i), th.toString()});
                                        coordinatorContext.transitionTo(CoordinatorState.FAILED);
                                    }
                                });
                            });
                        });
                        return;
                    case 4:
                    default:
                        this.log.error("Cannot load coordinator {} in state {}.", topicPartition, coordinatorContext.state);
                        return;
                }
            });
        });
    }

    public void scheduleUnloadOperation(TopicPartition topicPartition, OptionalInt optionalInt) {
        throwIfNotRunning();
        this.log.info("Scheduling unloading of metadata for {} with epoch {}", topicPartition, optionalInt);
        scheduleInternalOperation("UnloadCoordinator(tp=" + topicPartition + ", epoch=" + optionalInt + ")", topicPartition, () -> {
            CoordinatorRuntime<S, U>.CoordinatorContext coordinatorContext = this.coordinators.get(topicPartition);
            if (coordinatorContext == null) {
                this.log.info("Ignored unloading metadata for {} in epoch {} since metadata was never loaded.", topicPartition, optionalInt);
                return;
            }
            try {
                coordinatorContext.lock.lock();
                if (!optionalInt.isPresent() || coordinatorContext.epoch < optionalInt.getAsInt()) {
                    this.log.info("Started unloading metadata for {} with epoch {}.", topicPartition, optionalInt);
                    coordinatorContext.transitionTo(CoordinatorState.CLOSED);
                    this.coordinators.remove(topicPartition, coordinatorContext);
                    this.log.info("Finished unloading metadata for {} with epoch {}.", topicPartition, optionalInt);
                } else {
                    this.log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", new Object[]{topicPartition, optionalInt, Integer.valueOf(coordinatorContext.epoch)});
                }
            } finally {
                coordinatorContext.lock.unlock();
            }
        });
    }

    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        throwIfNotRunning();
        this.log.debug("Scheduling applying of a new metadata image with offset {}.", Long.valueOf(metadataImage.offset()));
        this.metadataImage = metadataImage;
        this.coordinators.keySet().forEach(topicPartition -> {
            scheduleInternalOperation("UpdateImage(tp=" + topicPartition + ", offset=" + metadataImage.offset() + ")", topicPartition, () -> {
                withContextOrThrow(topicPartition, coordinatorContext -> {
                    if (coordinatorContext.state != CoordinatorState.ACTIVE) {
                        this.log.debug("Ignoring new metadata image with offset {} for {} because the coordinator is not active.", Long.valueOf(metadataImage.offset()), topicPartition);
                    } else {
                        this.log.debug("Applying new metadata image with offset {} to {}.", Long.valueOf(metadataImage.offset()), topicPartition);
                        coordinatorContext.coordinator.onNewMetadataImage(metadataImage, metadataDelta);
                    }
                });
            });
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.isRunning.compareAndSet(true, false)) {
            this.log.warn("Coordinator runtime is already shutting down.");
            return;
        }
        this.log.info("Closing coordinator runtime.");
        Utils.closeQuietly(this.loader, "loader");
        Utils.closeQuietly(this.timer, "timer");
        Utils.closeQuietly(this.processor, "event processor");
        this.coordinators.forEach((topicPartition, coordinatorContext) -> {
            coordinatorContext.transitionTo(CoordinatorState.CLOSED);
        });
        this.coordinators.clear();
        Utils.closeQuietly(this.runtimeMetrics, "runtime metrics");
        this.log.info("Coordinator runtime closed.");
    }

    /* synthetic */ CoordinatorRuntime(String str, LogContext logContext, CoordinatorEventProcessor coordinatorEventProcessor, PartitionWriter partitionWriter, CoordinatorLoader coordinatorLoader, CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, Timer timer, Duration duration, CoordinatorRuntimeMetrics coordinatorRuntimeMetrics, CoordinatorMetrics coordinatorMetrics, AnonymousClass1 anonymousClass1) {
        this(str, logContext, coordinatorEventProcessor, partitionWriter, coordinatorLoader, coordinatorShardBuilderSupplier, time, timer, duration, coordinatorRuntimeMetrics, coordinatorMetrics);
    }
}
