package org.apache.pinot.controller.helix.core.realtime;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.annotations.VisibleForTesting;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.class */
public class SegmentCompletionManager {
    public static Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentCompletionManager.class);
    private final HelixManager _helixManager;
    private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap();
    private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap();
    private final PinotLLCRealtimeSegmentManager _segmentManager;
    private final ControllerMetrics _controllerMetrics;
    private final LeadControllerManager _leadControllerManager;
    private final Lock[] _fsmLocks;
    private static final int NUM_FSM_LOCKS = 20;
    private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager$SegmentCompletionFSM.class */
    public static class SegmentCompletionFSM {
        private static final long MAX_TIME_TO_PICK_WINNER_MS = 3300;
        private static final long MAX_TIME_TO_NOTIFY_WINNER_MS = 6600;
        public final Logger LOGGER;
        State _state;
        final long _startTimeMs;
        private final LLCSegmentName _segmentName;
        private final String _rawTableName;
        private final String _realtimeTableName;
        private final int _numReplicas;
        private final Set<String> _excludedServerStateMap;
        private final Map<String, Long> _commitStateMap;
        private long _winningOffset;
        private String _winner;
        private final PinotLLCRealtimeSegmentManager _segmentManager;
        private final SegmentCompletionManager _segmentCompletionManager;
        private final long _maxTimeToPickWinnerMs;
        private final long _maxTimeToNotifyWinnerMs;
        private final long _initialCommitTimeMs;
        private long _maxTimeAllowedToCommitMs;
        private final boolean _isSplitCommitEnabled;
        private final String _controllerVipUrl;

        public static SegmentCompletionFSM fsmInHolding(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            return new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
        }

        public static SegmentCompletionFSM fsmInCommit(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i, long j) {
            return new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i, j);
        }

        public static SegmentCompletionFSM fsmStoppedConsuming(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            SegmentCompletionFSM segmentCompletionFSM = new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
            segmentCompletionFSM._state = State.PARTIAL_CONSUMING;
            return segmentCompletionFSM;
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            this._state = State.HOLDING;
            this._winningOffset = -1L;
            this._segmentName = lLCSegmentName;
            this._rawTableName = this._segmentName.getTableName();
            this._realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(this._rawTableName);
            this._numReplicas = i;
            this._segmentManager = pinotLLCRealtimeSegmentManager;
            this._commitStateMap = new HashMap(this._numReplicas);
            this._excludedServerStateMap = new HashSet(this._numReplicas);
            this._segmentCompletionManager = segmentCompletionManager;
            this._startTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            this._maxTimeToPickWinnerMs = this._startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
            this._maxTimeToNotifyWinnerMs = this._startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
            long commitTimeoutMS = MAX_TIME_TO_NOTIFY_WINNER_MS + this._segmentManager.getCommitTimeoutMS(this._realtimeTableName);
            Long l = (Long) this._segmentCompletionManager._commitTimeMap.get(this._rawTableName);
            if (l != null && l.longValue() > commitTimeoutMS) {
                commitTimeoutMS = l.longValue();
            }
            this.LOGGER = LoggerFactory.getLogger("SegmentCompletionFSM_" + lLCSegmentName.getSegmentName());
            if (commitTimeoutMS > 1800000) {
                this.LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", Long.valueOf(commitTimeoutMS / 1000), this._realtimeTableName, Integer.valueOf(SegmentCompletionManager.MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS));
                commitTimeoutMS = 1800000;
            }
            this._initialCommitTimeMs = commitTimeoutMS;
            this._maxTimeAllowedToCommitMs = this._startTimeMs + this._initialCommitTimeMs;
            this._isSplitCommitEnabled = segmentCompletionManager.isSplitCommitEnabled();
            this._controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i, long j) {
            this(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
            this._state = State.COMMITTED;
            this._winningOffset = j;
            this._winner = "UNKNOWN";
        }

        public String toString() {
            return VectorFormat.DEFAULT_PREFIX + this._segmentName.getSegmentName() + "," + this._state + "," + this._startTimeMs + "," + this._winner + "," + this._winningOffset + "," + this._isSplitCommitEnabled + "," + this._controllerVipUrl + VectorFormat.DEFAULT_SUFFIX;
        }

        public boolean isDone() {
            return this._state.equals(State.COMMITTED) || this._state.equals(State.ABORTED);
        }

        public SegmentCompletionProtocol.Response segmentConsumed(String str, long j, String str2) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            synchronized (this) {
                this.LOGGER.info("Processing segmentConsumed({}, {})", str, Long.valueOf(j));
                if (this._excludedServerStateMap.contains(str)) {
                    this.LOGGER.info("Marking instance {} alive again", str);
                    this._excludedServerStateMap.remove(str);
                }
                this._commitStateMap.put(str, Long.valueOf(j));
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return PARTIAL_CONSUMING__consumed(str, j, currentTimeMs, str2);
                    case HOLDING:
                        return HOLDING__consumed(str, j, currentTimeMs, str2);
                    case COMMITTER_DECIDED:
                        return COMMITTER_DECIDED__consumed(str, j, currentTimeMs);
                    case COMMITTER_NOTIFIED:
                        return COMMITTER_NOTIFIED__consumed(str, j, currentTimeMs);
                    case COMMITTER_UPLOADING:
                        return COMMITTER_UPLOADING__consumed(str, j, currentTimeMs);
                    case COMMITTING:
                        return COMMITTING__consumed(str, j, currentTimeMs);
                    case COMMITTED:
                        return COMMITTED__consumed(str, j);
                    case ABORTED:
                        return hold(str, j);
                    default:
                        return fail(str, j);
                }
            }
        }

        public SegmentCompletionProtocol.Response segmentCommitStart(String str, long j) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            if (this._excludedServerStateMap.contains(str)) {
                this.LOGGER.warn("Not accepting commit from {} since it had stoppd consuming", str);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            synchronized (this) {
                this.LOGGER.info("Processing segmentCommitStart({}, {})", str, Long.valueOf(j));
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return PARTIAL_CONSUMING__commit(str, j, currentTimeMs);
                    case HOLDING:
                        return HOLDING__commit(str, j, currentTimeMs);
                    case COMMITTER_DECIDED:
                        return COMMITTER_DECIDED__commit(str, j, currentTimeMs);
                    case COMMITTER_NOTIFIED:
                        return COMMITTER_NOTIFIED__commit(str, j, currentTimeMs);
                    case COMMITTER_UPLOADING:
                        return COMMITTER_UPLOADING__commit(str, j, currentTimeMs);
                    case COMMITTING:
                        return COMMITTING__commit(str, j, currentTimeMs);
                    case COMMITTED:
                        return COMMITTED__commit(str, j);
                    case ABORTED:
                        return hold(str, j);
                    default:
                        return fail(str, j);
                }
            }
        }

        public SegmentCompletionProtocol.Response stoppedConsuming(String str, long j, String str2) {
            synchronized (this) {
                this.LOGGER.info("Processing stoppedConsuming({}, {})", str, Long.valueOf(j));
                this._excludedServerStateMap.add(str);
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return PARTIAL_CONSUMING__stoppedConsuming(str, j, str2);
                    case HOLDING:
                        return HOLDING_stoppedConsuming(str, j, str2);
                    case COMMITTER_DECIDED:
                        return COMMITTER_DECIDED__stoppedConsuming(str, j, str2);
                    case COMMITTER_NOTIFIED:
                        return COMMITTER_NOTIFIED__stoppedConsuming(str, j, str2);
                    case COMMITTER_UPLOADING:
                        return COMMITTER_UPLOADING__stoppedConsuming(str, j, str2);
                    case COMMITTING:
                        return COMMITTING__stoppedConsuming(str, j, str2);
                    case COMMITTED:
                        return COMMITTED__stoppedConsuming(str, j, str2);
                    case ABORTED:
                        this.LOGGER.info("Ignoring StoppedConsuming message from {} in state {}", str, this._state);
                        return SegmentCompletionProtocol.RESP_PROCESSED;
                    default:
                        return fail(str, j);
                }
            }
        }

        public SegmentCompletionProtocol.Response extendBuildTime(String str, long j, int i) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            synchronized (this) {
                this.LOGGER.info("Processing extendBuildTime({}, {}, {})", str, Long.valueOf(j), Integer.valueOf(i));
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                    case HOLDING:
                    case COMMITTER_DECIDED:
                        return fail(str, j);
                    case COMMITTER_NOTIFIED:
                        return COMMITTER_NOTIFIED__extendBuildlTime(str, j, i, currentTimeMs);
                    case COMMITTER_UPLOADING:
                    case COMMITTING:
                    case COMMITTED:
                    case ABORTED:
                    default:
                        return fail(str, j);
                }
            }
        }

        public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params, boolean z, boolean z2, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = params.getInstanceId();
            long offset = params.getOffset();
            synchronized (this) {
                if (this._excludedServerStateMap.contains(instanceId)) {
                    this.LOGGER.warn("Not accepting commitEnd from {} since it had stoppd consuming", instanceId);
                    return abortAndReturnFailed();
                }
                this.LOGGER.info("Processing segmentCommitEnd({}, {})", instanceId, Long.valueOf(offset));
                if (!this._state.equals(State.COMMITTER_UPLOADING) || !instanceId.equals(this._winner) || offset != this._winningOffset) {
                    this.LOGGER.warn("State change during upload: state={} segment={} winner={} winningOffset={}", this._state, this._segmentName.getSegmentName(), this._winner, Long.valueOf(this._winningOffset));
                    return abortAndReturnFailed();
                }
                if (!z) {
                    this.LOGGER.error("Segment upload failed");
                    return abortAndReturnFailed();
                }
                SegmentCompletionProtocol.Response commitSegment = commitSegment(params, z2, committingSegmentDescriptor);
                if (commitSegment.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
                    return commitSegment;
                }
                return abortAndReturnFailed();
            }
        }

        private SegmentCompletionProtocol.Response fail(String str, long j) {
            this.LOGGER.info("{}:FAIL for instance={} offset={}", this._state, str, Long.valueOf(j));
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response commit(String str, long j) {
            long j2 = (this._maxTimeAllowedToCommitMs - this._startTimeMs) / 1000;
            this.LOGGER.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", this._state, str, Long.valueOf(j), Long.valueOf(j2));
            SegmentCompletionProtocol.Response.Params withSplitCommit = new SegmentCompletionProtocol.Response.Params().withOffset(j).withBuildTimeSeconds(j2).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT).withSplitCommit(this._isSplitCommitEnabled);
            if (this._isSplitCommitEnabled) {
                withSplitCommit.withControllerVipUrl(this._controllerVipUrl);
            }
            return new SegmentCompletionProtocol.Response(withSplitCommit);
        }

        private SegmentCompletionProtocol.Response discard(String str, long j) {
            this.LOGGER.warn("{}:DISCARD for instance={} offset={}", this._state, str, Long.valueOf(j));
            return SegmentCompletionProtocol.RESP_DISCARD;
        }

        private SegmentCompletionProtocol.Response keep(String str, long j) {
            this.LOGGER.info("{}:KEEP for instance={} offset={}", this._state, str, Long.valueOf(j));
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(j).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
        }

        private SegmentCompletionProtocol.Response catchup(String str, long j) {
            this.LOGGER.info("{}:CATCHUP for instance={} offset={}", this._state, str, Long.valueOf(j));
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(this._winningOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
        }

        private SegmentCompletionProtocol.Response hold(String str, long j) {
            this.LOGGER.info("{}:HOLD for instance={} offset={}", this._state, str, Long.valueOf(j));
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(j));
        }

        private SegmentCompletionProtocol.Response abortAndReturnHold(long j, String str, long j2) {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return hold(str, j2);
        }

        private SegmentCompletionProtocol.Response abortAndReturnFailed() {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long j, String str, long j2) {
            if (j <= this._maxTimeAllowedToCommitMs) {
                return null;
            }
            this.LOGGER.warn("{}:Aborting FSM (too late) instance={} offset={} now={} start={}", this._state, str, Long.valueOf(j2), Long.valueOf(j), Long.valueOf(this._startTimeMs));
            return abortAndReturnHold(j, str, j2);
        }

        private int numReplicasToLookFor() {
            return this._numReplicas - this._excludedServerStateMap.size();
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__consumed(String str, long j, long j2, String str2) {
            this._state = State.HOLDING;
            return HOLDING__consumed(str, j, j2, str2);
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__commit(String str, long j, long j2) {
            return processCommitWhileHoldingOrPartialConsuming(str, j, j2);
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, true);
        }

        private SegmentCompletionProtocol.Response HOLDING__consumed(String str, long j, long j2, String str2) {
            SegmentCompletionProtocol.Response hold;
            if (!isWinnerPicked(str, j2, str2)) {
                hold = hold(str, j);
            } else if (this._winner.equals(str)) {
                this.LOGGER.info("{}:Committer notified winner instance={} offset={}", this._state, str, Long.valueOf(j));
                hold = commit(str, j);
                this._state = State.COMMITTER_NOTIFIED;
            } else {
                this.LOGGER.info("{}:Committer decided winner={} offset={}", this._state, this._winner, Long.valueOf(this._winningOffset));
                hold = catchup(str, j);
                this._state = State.COMMITTER_DECIDED;
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response HOLDING__commit(String str, long j, long j2) {
            return processCommitWhileHoldingOrPartialConsuming(str, j, j2);
        }

        private SegmentCompletionProtocol.Response HOLDING_stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, true);
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__consumed(String str, long j, long j2) {
            SegmentCompletionProtocol.Response hold;
            if (j > this._winningOffset) {
                this.LOGGER.warn("{}:Aborting FSM (offset larger than winning) instance={} offset={} now={} winning={}", this._state, str, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(this._winningOffset));
                return abortAndReturnHold(j2, str, j);
            }
            if (!this._winner.equals(str)) {
                hold = j == this._winningOffset ? hold(str, j) : catchup(str, j);
            } else if (this._winningOffset == j) {
                this.LOGGER.info("{}:Notifying winner instance={} offset={}", this._state, str, Long.valueOf(j));
                hold = commit(str, j);
                this._state = State.COMMITTER_NOTIFIED;
            } else {
                this.LOGGER.warn("{}:Winner coming back with different offset for instance={} offset={} prevWinnOffset={}", this._state, str, Long.valueOf(j), Long.valueOf(this._winningOffset));
                hold = abortAndReturnHold(j2, str, j);
            }
            if (j2 > this._maxTimeToNotifyWinnerMs) {
                hold = abortAndReturnHold(j2, str, j);
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__commit(String str, long j, long j2) {
            return processCommitWhileHoldingOrPartialConsuming(str, j, j2);
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, false);
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__consumed(String str, long j, long j2) {
            SegmentCompletionProtocol.Response hold;
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j2, str, j);
            if (abortIfTooLateAndReturnHold != null) {
                return abortIfTooLateAndReturnHold;
            }
            if (!str.equals(this._winner)) {
                hold = j == this._winningOffset ? hold(str, j) : j < this._winningOffset ? catchup(str, j) : hold(str, j);
            } else if (j == this._winningOffset) {
                hold = commit(str, j);
            } else {
                hold = discard(str, j);
                this.LOGGER.warn("{}:Aborting for instance={} offset={}", this._state, str, Long.valueOf(j));
                this._state = State.ABORTED;
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__commit(String str, long j, long j2) {
            SegmentCompletionProtocol.Response checkBadCommitRequest = checkBadCommitRequest(str, j, j2);
            if (checkBadCommitRequest != null) {
                return checkBadCommitRequest;
            }
            this.LOGGER.info("{}:Uploading for instance={} offset={}", this._state, str, Long.valueOf(j));
            this._state = State.COMMITTER_UPLOADING;
            long j3 = j2 - this._startTimeMs;
            if (j3 > this._initialCommitTimeMs) {
                this._segmentCompletionManager._commitTimeMap.put(this._segmentName.getTableName(), Long.valueOf(j3));
            }
            return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, false);
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__extendBuildlTime(String str, long j, int i, long j2) {
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j2, str, j);
            if (abortIfTooLateAndReturnHold == null) {
                long j3 = j2 + (i * 1000);
                if (j3 > this._startTimeMs + 1800000) {
                    this.LOGGER.warn("Not accepting lease extension from {} startTime={} requestedTime={}", str, Long.valueOf(this._startTimeMs), Long.valueOf(j3));
                    return abortAndReturnFailed();
                }
                this._maxTimeAllowedToCommitMs = j3;
                abortIfTooLateAndReturnHold = SegmentCompletionProtocol.RESP_PROCESSED;
            }
            return abortIfTooLateAndReturnHold;
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__consumed(String str, long j, long j2) {
            return processConsumedAfterCommitStart(str, j, j2);
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__commit(String str, long j, long j2) {
            return processCommitWhileUploading(str, j, j2);
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, false);
        }

        private SegmentCompletionProtocol.Response COMMITTING__consumed(String str, long j, long j2) {
            return processConsumedAfterCommitStart(str, j, j2);
        }

        private SegmentCompletionProtocol.Response COMMITTING__commit(String str, long j, long j2) {
            return processCommitWhileUploading(str, j, j2);
        }

        private SegmentCompletionProtocol.Response COMMITTING__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, false);
        }

        private SegmentCompletionProtocol.Response COMMITTED__consumed(String str, long j) {
            return j == this._winningOffset ? keep(str, j) : discard(str, j);
        }

        private SegmentCompletionProtocol.Response COMMITTED__commit(String str, long j) {
            return j == this._winningOffset ? keep(str, j) : discard(str, j);
        }

        private SegmentCompletionProtocol.Response COMMITTED__stoppedConsuming(String str, long j, String str2) {
            return processStoppedConsuming(str, j, str2, false);
        }

        private SegmentCompletionProtocol.Response processStoppedConsuming(String str, long j, String str2, boolean z) {
            this.LOGGER.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", str, this._segmentName, Long.valueOf(j), this._state, Boolean.valueOf(z), str2);
            try {
                this._segmentManager.segmentStoppedConsuming(this._segmentName, str);
                return SegmentCompletionProtocol.RESP_PROCESSED;
            } catch (Exception e) {
                this.LOGGER.error("Caught exception while processing stopped CONSUMING segment: {} on instance: {}", this._segmentName.getSegmentName(), str, e);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
        }

        private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String str, long j, long j2) {
            if (abortIfTooLateAndReturnHold(j2, str, j) != null) {
                return null;
            }
            if (!str.equals(this._winner)) {
                return j == this._winningOffset ? hold(str, j) : j < this._winningOffset ? catchup(str, j) : hold(str, j);
            }
            this.LOGGER.warn("{}:Aborting FSM because winner is reporting a segment while it is also committing instance={} offset={} now={}", this._state, str, Long.valueOf(j), Long.valueOf(j2));
            return abortAndReturnHold(j2, str, j);
        }

        private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params params, boolean z, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = params.getInstanceId();
            long offset = params.getOffset();
            if (!this._state.equals(State.COMMITTER_UPLOADING)) {
                this.LOGGER.warn("State change during upload: state={} segment={} winner={} winningOffset={}", this._state, this._segmentName.getSegmentName(), this._winner, Long.valueOf(this._winningOffset));
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            this.LOGGER.info("Committing segment {} at offset {} winner {}", this._segmentName.getSegmentName(), Long.valueOf(offset), instanceId);
            this._state = State.COMMITTING;
            if (z) {
                try {
                    this._segmentManager.commitSegmentFile(this._realtimeTableName, committingSegmentDescriptor);
                } catch (Exception e) {
                    this.LOGGER.error("Caught exception while committing segment file for segment: {}", this._segmentName.getSegmentName(), e);
                    return SegmentCompletionProtocol.RESP_FAILED;
                }
            }
            try {
                this._segmentManager.commitSegmentMetadata(this._realtimeTableName, committingSegmentDescriptor);
                this._state = State.COMMITTED;
                this.LOGGER.info("Committed segment {} at offset {} winner {}", this._segmentName.getSegmentName(), Long.valueOf(offset), instanceId);
                return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
            } catch (Exception e2) {
                this.LOGGER.error("Caught exception while committing segment metadata for segment: {}", this._segmentName.getSegmentName(), e2);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
        }

        private SegmentCompletionProtocol.Response processCommitWhileUploading(String str, long j, long j2) {
            this.LOGGER.info("Processing segmentCommit({}, {})", str, Long.valueOf(j));
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j2, str, j);
            return abortIfTooLateAndReturnHold != null ? abortIfTooLateAndReturnHold : new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(j).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        }

        private SegmentCompletionProtocol.Response checkBadCommitRequest(String str, long j, long j2) {
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j2, str, j);
            if (abortIfTooLateAndReturnHold != null) {
                return abortIfTooLateAndReturnHold;
            }
            if (!str.equals(this._winner) || j == this._winningOffset) {
                return null;
            }
            this.LOGGER.warn("{}:Aborting FSM (bad commit req) instance={} offset={} now={} winning={}", this._state, str, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(this._winningOffset));
            return abortAndReturnHold(j2, str, j);
        }

        private SegmentCompletionProtocol.Response processCommitWhileHoldingOrPartialConsuming(String str, long j, long j2) {
            this.LOGGER.info("Processing segmentCommit({}, {})", str, Long.valueOf(j));
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j2, str, j);
            return abortIfTooLateAndReturnHold != null ? abortIfTooLateAndReturnHold : hold(str, j);
        }

        private boolean isWinnerPicked(String str, long j, String str2) {
            if (SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(str2) && this._commitStateMap.size() == 1) {
                this._winner = str;
                this._winningOffset = this._commitStateMap.get(str).longValue();
                return true;
            }
            if (j <= this._maxTimeToPickWinnerMs && this._commitStateMap.size() != numReplicasToLookFor()) {
                return false;
            }
            this.LOGGER.info("{}:Picking winner time={} size={}", this._state, Long.valueOf(j - this._startTimeMs), Integer.valueOf(this._commitStateMap.size()));
            long j2 = -1;
            String str3 = null;
            for (Map.Entry<String, Long> entry : this._commitStateMap.entrySet()) {
                if (entry.getValue().longValue() > j2) {
                    j2 = entry.getValue().longValue();
                    str3 = entry.getKey();
                }
            }
            this._winningOffset = j2;
            if (this._commitStateMap.get(str).longValue() == j2) {
                str3 = str;
            }
            this._winner = str3;
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager$State.class */
    public enum State {
        PARTIAL_CONSUMING,
        HOLDING,
        COMMITTER_DECIDED,
        COMMITTER_NOTIFIED,
        COMMITTER_UPLOADING,
        COMMITTING,
        COMMITTED,
        ABORTED
    }

    public static int getMaxCommitTimeForAllSegmentsSeconds() {
        return MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS;
    }

    public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, int i) {
        this._helixManager = helixManager;
        this._segmentManager = pinotLLCRealtimeSegmentManager;
        this._controllerMetrics = controllerMetrics;
        this._leadControllerManager = leadControllerManager;
        SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(i, TimeUnit.SECONDS));
        this._fsmLocks = new Lock[20];
        for (int i2 = 0; i2 < 20; i2++) {
            this._fsmLocks[i2] = new ReentrantLock();
        }
    }

    public boolean isSplitCommitEnabled() {
        return this._segmentManager.getIsSplitCommitEnabled();
    }

    public String getControllerVipUrl() {
        return this._segmentManager.getControllerVipUrl();
    }

    protected long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName lLCSegmentName, String str) {
        String segmentName = lLCSegmentName.getSegmentName();
        Lock lock = this._fsmLocks[(segmentName.hashCode() & Integer.MAX_VALUE) % 20];
        try {
            try {
                lock.lock();
                SegmentCompletionFSM segmentCompletionFSM = this._fsmMap.get(segmentName);
                if (segmentCompletionFSM == null) {
                    LLCRealtimeSegmentZKMetadata segmentZKMetadata = this._segmentManager.getSegmentZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName()), lLCSegmentName.getSegmentName(), null);
                    segmentCompletionFSM = segmentZKMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.DONE) ? SegmentCompletionFSM.fsmInCommit(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas(), segmentZKMetadata.getEndOffset()) : str.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING) ? SegmentCompletionFSM.fsmStoppedConsuming(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas()) : SegmentCompletionFSM.fsmInHolding(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas());
                    LOGGER.info("Created FSM {}", segmentCompletionFSM);
                    this._fsmMap.put(segmentName, segmentCompletionFSM);
                }
                return segmentCompletionFSM;
            } catch (Exception e) {
                LOGGER.error("Exception getting FSM for segment {}", segmentName, e);
                throw new RuntimeException("Exception getting FSM for segment " + segmentName, e);
            }
        } finally {
            lock.unlock();
        }
    }

    public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        if (!isLeader(segmentName.split("__")[0]) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        String reason = params.getReason();
        long offset = params.getOffset();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        SegmentCompletionFSM segmentCompletionFSM = null;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_CONSUMED);
            response = segmentCompletionFSM.segmentConsumed(instanceId, offset, reason);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentConsumed for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        if (!isLeader(segmentName.split("__")[0]) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        long offset = params.getOffset();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.segmentCommitStart(instanceId, offset);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitStart for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response extendBuildTime(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        if (!isLeader(segmentName.split("__")[0]) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        long offset = params.getOffset();
        int extraTimeSec = params.getExtraTimeSec();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.extendBuildTime(instanceId, offset, extraTimeSec);
        } catch (Exception e) {
            LOGGER.error("Caught exception in extendBuildTime for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        if (!isLeader(segmentName.split("__")[0]) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        long offset = params.getOffset();
        String reason = params.getReason();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING);
            response = segmentCompletionFSM.stoppedConsuming(instanceId, offset, reason);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentStoppedConsuming for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params, boolean z, boolean z2, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = params.getSegmentName();
        if (!isLeader(segmentName.split("__")[0]) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.segmentCommitEnd(params, z, z2, committingSegmentDescriptor);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitEnd for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    @VisibleForTesting
    protected boolean isLeader(String str) {
        return this._leadControllerManager.isLeaderForTable(str);
    }
}
