package org.apache.rocketmq.store.ha.autoswitch;

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.GroupTransferService;
import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;

/* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.class */
public class AutoSwitchHAService extends DefaultHAService {
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqStore");
    private EpochFileCache epochCache;
    private AutoSwitchHAClient haClient;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
    private final ConcurrentHashMap<Long, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
    private final List<Consumer<Set<Long>>> syncStateSetChangedListeners = new ArrayList();
    private final Set<Long> syncStateSet = new HashSet();
    private final Set<Long> remoteSyncStateSet = new HashSet();
    private final ReadWriteLock syncStateSetReadWriteLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.syncStateSetReadWriteLock.readLock();
    private final Lock writeLock = this.syncStateSetReadWriteLock.writeLock();
    private volatile boolean isSynchronizingSyncStateSet = false;
    private Long brokerControllerId = null;

    /* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService$AutoSwitchAcceptSocketService.class */
    class AutoSwitchAcceptSocketService extends DefaultHAService.AcceptSocketService {
        public AutoSwitchAcceptSocketService(MessageStoreConfig messageStoreConfig) {
            super(messageStoreConfig);
        }

        public String getServiceName() {
            return AutoSwitchHAService.this.defaultMessageStore.getBrokerConfig().isInBrokerContainer() ? AutoSwitchHAService.this.defaultMessageStore.getBrokerConfig().getIdentifier() + DefaultHAService.AcceptSocketService.class.getSimpleName() : AutoSwitchAcceptSocketService.class.getSimpleName();
        }

        @Override // org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService
        protected HAConnection createConnection(SocketChannel socketChannel) throws IOException {
            return new AutoSwitchHAConnection(AutoSwitchHAService.this, socketChannel, AutoSwitchHAService.this.epochCache);
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void init(DefaultMessageStore defaultMessageStore) throws IOException {
        this.epochCache = new EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
        this.epochCache.initCacheFromFile();
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService = new AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
        this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
        this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void shutdown() {
        super.shutdown();
        if (this.haClient != null) {
            this.haClient.shutdown();
        }
        this.executorService.shutdown();
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService
    public void removeConnection(HAConnection hAConnection) {
        if (!this.defaultMessageStore.isShutdown()) {
            Set<Long> localSyncStateSet = getLocalSyncStateSet();
            Long valueOf = Long.valueOf(((AutoSwitchHAConnection) hAConnection).getSlaveId());
            if (localSyncStateSet.contains(valueOf)) {
                localSyncStateSet.remove(valueOf);
                markSynchronizingSyncStateSet(localSyncStateSet);
                notifySyncStateSetChanged(localSyncStateSet);
            }
        }
        super.removeConnection(hAConnection);
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToMaster(int i) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to master", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        destroyConnections();
        if (this.haClient != null) {
            this.haClient.shutdown();
        }
        long truncateInvalidMsg = truncateInvalidMsg();
        this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
        if (truncateInvalidMsg >= 0) {
            this.epochCache.truncateSuffixByOffset(truncateInvalidMsg);
        }
        EpochEntry epochEntry = new EpochEntry(i, this.defaultMessageStore.getMaxPhyOffset());
        if (this.epochCache.lastEpoch() >= i) {
            this.epochCache.truncateSuffixByEpoch(i);
        }
        this.epochCache.appendEntry(epochEntry);
        while (this.defaultMessageStore.dispatchBehindBytes() > 0) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
        if (this.defaultMessageStore.isTransientStorePoolEnable()) {
            waitingForAllCommit();
            this.defaultMessageStore.getTransientStorePool().setRealCommit(true);
        }
        LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", new Object[]{Long.valueOf(truncateInvalidMsg), Long.valueOf(this.defaultMessageStore.getConfirmOffset()), Long.valueOf(this.defaultMessageStore.getMaxPhyOffset())});
        this.defaultMessageStore.recoverTopicQueueTable();
        this.defaultMessageStore.setStateMachineVersion(i);
        LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", Integer.valueOf(i), Long.valueOf(epochEntry.getStartOffset()));
        return true;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToSlave(String str, int i, Long l) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to slave", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        try {
            destroyConnections();
            if (this.haClient == null) {
                this.haClient = new AutoSwitchHAClient(this, this.defaultMessageStore, this.epochCache, l);
            } else {
                this.haClient.reOpen();
            }
            this.haClient.updateMasterAddress(str);
            this.haClient.updateHaMasterAddress(null);
            this.haClient.start();
            if (this.defaultMessageStore.isTransientStorePoolEnable()) {
                waitingForAllCommit();
                this.defaultMessageStore.getTransientStorePool().setRealCommit(false);
            }
            this.defaultMessageStore.setStateMachineVersion(i);
            LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", str, Integer.valueOf(i));
            return true;
        } catch (Exception e) {
            LOGGER.error("Error happen when change ha to slave", e);
            return false;
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToMasterWhenLastRoleIsMaster(int i) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to master", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        EpochEntry epochEntry = new EpochEntry(i, this.defaultMessageStore.getMaxPhyOffset());
        if (this.epochCache.lastEpoch() >= i) {
            this.epochCache.truncateSuffixByEpoch(i);
        }
        this.epochCache.appendEntry(epochEntry);
        this.defaultMessageStore.setStateMachineVersion(i);
        LOGGER.info("Change ha to master success, last role is master, newMasterEpoch:{}, startOffset:{}", Integer.valueOf(i), Long.valueOf(epochEntry.getStartOffset()));
        return true;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToSlaveWhenMasterNotChange(String str, int i) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to slave", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        this.defaultMessageStore.setStateMachineVersion(i);
        LOGGER.info("Change ha to slave success, master doesn't change, newMasterAddress:{}, newMasterEpoch:{}", str, Integer.valueOf(i));
        return true;
    }

    public void waitingForAllCommit() {
        while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
            getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public HAClient getHAClient() {
        return this.haClient;
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void updateHaMasterAddress(String str) {
        if (this.haClient != null) {
            this.haClient.updateHaMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void updateMasterAddress(String str) {
    }

    public void registerSyncStateSetChangedListener(Consumer<Set<Long>> consumer) {
        this.syncStateSetChangedListeners.add(consumer);
    }

    public void notifySyncStateSetChanged(Set<Long> set) {
        this.executorService.submit(() -> {
            this.syncStateSetChangedListeners.forEach(consumer -> {
                consumer.accept(set);
            });
        });
        LOGGER.info("Notify the syncStateSet has been changed into {}.", set);
    }

    public Set<Long> maybeShrinkSyncStateSet() {
        Set<Long> localSyncStateSet = getLocalSyncStateSet();
        boolean z = false;
        long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
        for (Map.Entry<Long, Long> entry : this.connectionCaughtUpTimeTable.entrySet()) {
            Long key = entry.getKey();
            if (localSyncStateSet.contains(key)) {
                if (System.currentTimeMillis() - entry.getValue().longValue() > haMaxTimeSlaveNotCatchup) {
                    localSyncStateSet.remove(key);
                    z = true;
                }
            }
        }
        for (Long l : localSyncStateSet) {
            if (!this.connectionCaughtUpTimeTable.containsKey(l)) {
                localSyncStateSet.remove(l);
                z = true;
            }
        }
        if (z) {
            markSynchronizingSyncStateSet(localSyncStateSet);
        }
        return localSyncStateSet;
    }

    public void maybeExpandInSyncStateSet(Long l, long j) {
        Set<Long> localSyncStateSet = getLocalSyncStateSet();
        if (localSyncStateSet.contains(l)) {
            return;
        }
        long confirmOffset = this.defaultMessageStore.getConfirmOffset();
        if (j >= confirmOffset) {
            EpochEntry lastEntry = this.epochCache.lastEntry();
            if (j >= lastEntry.getStartOffset()) {
                LOGGER.info("The slave {} has caught up, slaveMaxOffset: {}, confirmOffset: {}, epoch: {}, leader epoch startOffset: {}.", new Object[]{l, Long.valueOf(j), Long.valueOf(confirmOffset), Integer.valueOf(lastEntry.getEpoch()), Long.valueOf(lastEntry.getStartOffset())});
                localSyncStateSet.add(l);
                markSynchronizingSyncStateSet(localSyncStateSet);
                notifySyncStateSetChanged(localSyncStateSet);
            }
        }
    }

    private void markSynchronizingSyncStateSet(Set<Long> set) {
        this.writeLock.lock();
        try {
            this.isSynchronizingSyncStateSet = true;
            this.remoteSyncStateSet.clear();
            this.remoteSyncStateSet.addAll(set);
        } finally {
            this.writeLock.unlock();
        }
    }

    private void markSynchronizingSyncStateSetDone() {
        this.isSynchronizingSyncStateSet = false;
    }

    public boolean isSynchronizingSyncStateSet() {
        return this.isSynchronizingSyncStateSet;
    }

    public void updateConnectionLastCaughtUpTime(Long l, long j) {
        this.connectionCaughtUpTimeTable.put(l, Long.valueOf(Math.max(((Long) ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, l, l2 -> {
            return 0L;
        })).longValue(), j)));
    }

    public void updateConfirmOffsetWhenSlaveAck(Long l) {
        this.readLock.lock();
        try {
            if (this.syncStateSet.contains(l)) {
                this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
            }
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public int inSyncReplicasNums(long j) {
        this.readLock.lock();
        try {
            if (this.isSynchronizingSyncStateSet) {
                int max = Math.max(this.syncStateSet.size(), this.remoteSyncStateSet.size());
                this.readLock.unlock();
                return max;
            }
            int size = this.syncStateSet.size();
            this.readLock.unlock();
            return size;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public HARuntimeInfo getRuntimeInfo(long j) {
        HARuntimeInfo hARuntimeInfo = new HARuntimeInfo();
        if (BrokerRole.SLAVE.equals(getDefaultMessageStore().getMessageStoreConfig().getBrokerRole())) {
            hARuntimeInfo.setMaster(false);
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterAddr(this.haClient.getHaMasterAddress());
            hARuntimeInfo.getHaClientRuntimeInfo().setMaxOffset(getDefaultMessageStore().getMaxPhyOffset());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastReadTimestamp(this.haClient.getLastReadTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastWriteTimestamp(this.haClient.getLastWriteTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setTransferredByteInSecond(this.haClient.getTransferredByteInSecond());
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterFlushOffset(this.defaultMessageStore.getMasterFlushedOffset());
        } else {
            hARuntimeInfo.setMaster(true);
            hARuntimeInfo.setMasterCommitLogMaxOffset(j);
            Set<Long> localSyncStateSet = getLocalSyncStateSet();
            for (HAConnection hAConnection : this.connectionList) {
                HARuntimeInfo.HAConnectionRuntimeInfo hAConnectionRuntimeInfo = new HARuntimeInfo.HAConnectionRuntimeInfo();
                long slaveAckOffset = hAConnection.getSlaveAckOffset();
                hAConnectionRuntimeInfo.setSlaveAckOffset(slaveAckOffset);
                hAConnectionRuntimeInfo.setDiff(j - slaveAckOffset);
                hAConnectionRuntimeInfo.setAddr(hAConnection.getClientAddress().substring(1));
                hAConnectionRuntimeInfo.setTransferredByteInSecond(hAConnection.getTransferredByteInSecond());
                hAConnectionRuntimeInfo.setTransferFromWhere(hAConnection.getTransferFromWhere());
                hAConnectionRuntimeInfo.setInSync(localSyncStateSet.contains(Long.valueOf(((AutoSwitchHAConnection) hAConnection).getSlaveId())));
                hARuntimeInfo.getHaConnectionInfo().add(hAConnectionRuntimeInfo);
            }
            hARuntimeInfo.setInSyncSlaveNums(localSyncStateSet.size() - 1);
        }
        return hARuntimeInfo;
    }

    public long computeConfirmOffset() {
        Set<Long> syncStateSet = getSyncStateSet();
        long maxPhyOffset = this.defaultMessageStore.getMaxPhyOffset();
        List list = (List) this.connectionList.stream().map(hAConnection -> {
            return Long.valueOf(((AutoSwitchHAConnection) hAConnection).getSlaveId());
        }).collect(Collectors.toList());
        for (Long l : syncStateSet) {
            if (!list.contains(l) && this.brokerControllerId != null && !Objects.equals(l, this.brokerControllerId)) {
                LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", l);
                return this.defaultMessageStore.getConfirmOffsetDirectly();
            }
        }
        for (HAConnection hAConnection2 : this.connectionList) {
            if (syncStateSet.contains(Long.valueOf(((AutoSwitchHAConnection) hAConnection2).getSlaveId())) && hAConnection2.getSlaveAckOffset() > 0) {
                maxPhyOffset = Math.min(maxPhyOffset, hAConnection2.getSlaveAckOffset());
            }
        }
        return maxPhyOffset;
    }

    public void setSyncStateSet(Set<Long> set) {
        this.writeLock.lock();
        try {
            markSynchronizingSyncStateSetDone();
            this.syncStateSet.clear();
            this.syncStateSet.addAll(set);
            this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
        } finally {
            this.writeLock.unlock();
        }
    }

    public Set<Long> getSyncStateSet() {
        this.readLock.lock();
        try {
            if (!this.isSynchronizingSyncStateSet) {
                HashSet hashSet = new HashSet(this.syncStateSet.size());
                hashSet.addAll(this.syncStateSet);
                return hashSet;
            }
            HashSet hashSet2 = new HashSet(this.syncStateSet.size() + this.remoteSyncStateSet.size());
            hashSet2.addAll(this.syncStateSet);
            hashSet2.addAll(this.remoteSyncStateSet);
            return hashSet2;
        } finally {
            this.readLock.unlock();
        }
    }

    public Set<Long> getLocalSyncStateSet() {
        this.readLock.lock();
        try {
            HashSet hashSet = new HashSet(this.syncStateSet.size());
            hashSet.addAll(this.syncStateSet);
            return hashSet;
        } finally {
            this.readLock.unlock();
        }
    }

    public void truncateEpochFilePrefix(long j) {
        this.epochCache.truncatePrefixByOffset(j);
    }

    public void truncateEpochFileSuffix(long j) {
        this.epochCache.truncateSuffixByOffset(j);
    }

    public long truncateInvalidMsg() {
        if (this.defaultMessageStore.dispatchBehindBytes() <= 0) {
            LOGGER.info("Dispatch complete, skip truncate");
            return -1L;
        }
        boolean z = true;
        long reputFromOffset = this.defaultMessageStore.getReputFromOffset();
        do {
            SelectMappedBufferResult data = this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
            if (data == null) {
                break;
            }
            try {
                reputFromOffset = data.getStartOffset();
                int i = 0;
                while (true) {
                    if (i >= data.getSize()) {
                        break;
                    }
                    DispatchRequest checkMessageAndReturnSize = this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(data.getByteBuffer(), false, false);
                    if (!checkMessageAndReturnSize.isSuccess()) {
                        z = false;
                        break;
                    }
                    int msgSize = checkMessageAndReturnSize.getMsgSize();
                    if (msgSize <= 0) {
                        reputFromOffset = this.defaultMessageStore.getCommitLog().rollNextFile(reputFromOffset);
                        break;
                    }
                    reputFromOffset += msgSize;
                    i += msgSize;
                }
                if (reputFromOffset >= this.defaultMessageStore.getMaxPhyOffset()) {
                    break;
                }
            } finally {
                data.release();
            }
        } while (z);
        LOGGER.info("Truncate commitLog to {}", Long.valueOf(reputFromOffset));
        this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
        return reputFromOffset;
    }

    public int getLastEpoch() {
        return this.epochCache.lastEpoch();
    }

    public List<EpochEntry> getEpochEntries() {
        return this.epochCache.getAllEntries();
    }

    public Long getBrokerControllerId() {
        return this.brokerControllerId;
    }

    public void setBrokerControllerId(Long l) {
        this.brokerControllerId = l;
    }
}
