package org.apache.pinot.shaded.org.apache.kafka.raft;

import java.util.Collections;
import java.util.Optional;
import org.apache.pinot.shaded.org.apache.kafka.common.KafkaException;
import org.apache.pinot.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.pinot.shaded.org.apache.kafka.raft.BatchReader;
import org.apache.pinot.shaded.org.apache.kafka.raft.RaftClient;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/pinot/shaded/org/apache/kafka/raft/ReplicatedCounter.class */
public class ReplicatedCounter implements RaftClient.Listener<Integer> {
    private final int nodeId;
    private final Logger log;
    private final RaftClient<Integer> client;
    private int committed = 0;
    private int uncommitted = 0;
    private Optional<Integer> claimedEpoch = Optional.empty();

    public ReplicatedCounter(int i, RaftClient<Integer> raftClient, LogContext logContext) {
        this.nodeId = i;
        this.client = raftClient;
        this.log = logContext.logger(ReplicatedCounter.class);
    }

    public synchronized boolean isWritable() {
        return this.claimedEpoch.isPresent();
    }

    public synchronized void increment() {
        if (!this.claimedEpoch.isPresent()) {
            throw new KafkaException("Counter is not currently writable");
        }
        int intValue = this.claimedEpoch.get().intValue();
        this.uncommitted++;
        Long scheduleAppend = this.client.scheduleAppend(intValue, Collections.singletonList(Integer.valueOf(this.uncommitted)));
        if (scheduleAppend != null) {
            this.log.debug("Scheduled append of record {} with epoch {} at offset {}", Integer.valueOf(this.uncommitted), Integer.valueOf(intValue), scheduleAppend);
        }
    }

    @Override // org.apache.pinot.shaded.org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleCommit(BatchReader<Integer> batchReader) {
        try {
            int i = this.committed;
            while (batchReader.hasNext()) {
                BatchReader.Batch next = batchReader.next();
                this.log.debug("Handle commit of batch with records {} at base offset {}", next.records(), Long.valueOf(next.baseOffset()));
                for (Integer num : next.records()) {
                    if (num.intValue() != this.committed + 1) {
                        throw new AssertionError("Expected next committed value to be " + (this.committed + 1) + ", but instead found " + num + " on node " + this.nodeId);
                    }
                    this.committed = num.intValue();
                }
            }
            this.log.debug("Counter incremented from {} to {}", Integer.valueOf(i), Integer.valueOf(this.committed));
            batchReader.close();
        } catch (Throwable th) {
            batchReader.close();
            throw th;
        }
    }

    @Override // org.apache.pinot.shaded.org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleClaim(int i) {
        this.log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", Integer.valueOf(this.committed), Integer.valueOf(i));
        this.uncommitted = this.committed;
        this.claimedEpoch = Optional.of(Integer.valueOf(i));
    }

    @Override // org.apache.pinot.shaded.org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleResign(int i) {
        this.log.debug("Counter uncommitted value reset after resigning leadership");
        this.uncommitted = -1;
        this.claimedEpoch = Optional.empty();
    }
}
