package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/* loaded from: input_file:org/apache/rocketmq/store/ha/DefaultHAService.class */
public class DefaultHAService implements HAService {
    private static final Logger log = LoggerFactory.getLogger("RocketmqStore");
    protected AcceptSocketService acceptSocketService;
    protected DefaultMessageStore defaultMessageStore;
    protected GroupTransferService groupTransferService;
    protected HAClient haClient;
    protected HAConnectionStateNotificationService haConnectionStateNotificationService;
    protected final AtomicInteger connectionCount = new AtomicInteger(0);
    protected final List<HAConnection> connectionList = new LinkedList();
    protected WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
    protected AtomicLong push2SlaveMaxOffset = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/rocketmq/store/ha/DefaultHAService$AcceptSocketService.class */
    public abstract class AcceptSocketService extends ServiceThread {
        private final SocketAddress socketAddressListen;
        private ServerSocketChannel serverSocketChannel;
        private Selector selector;
        private final MessageStoreConfig messageStoreConfig;

        public AcceptSocketService(MessageStoreConfig messageStoreConfig) {
            this.messageStoreConfig = messageStoreConfig;
            this.socketAddressListen = new InetSocketAddress(messageStoreConfig.getHaListenPort());
        }

        public void beginAccept() throws Exception {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.selector = NetworkUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            if (0 == this.messageStoreConfig.getHaListenPort()) {
                this.messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
                DefaultHAService.log.info("OS picked up {} to listen for HA", Integer.valueOf(this.messageStoreConfig.getHaListenPort()));
            }
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.register(this.selector, 16);
        }

        public void shutdown(boolean z) {
            super.shutdown(z);
            try {
                if (null != this.serverSocketChannel) {
                    this.serverSocketChannel.close();
                }
                if (null != this.selector) {
                    this.selector.close();
                }
            } catch (IOException e) {
                DefaultHAService.log.error("AcceptSocketService shutdown exception", e);
            }
        }

        public void run() {
            DefaultHAService.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    this.selector.select(1000L);
                    Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
                    if (selectedKeys != null) {
                        for (SelectionKey selectionKey : selectedKeys) {
                            if (selectionKey.isAcceptable()) {
                                SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
                                if (accept != null) {
                                    DefaultHAService.log.info("HAService receive new connection, " + accept.socket().getRemoteSocketAddress());
                                    try {
                                        HAConnection createConnection = createConnection(accept);
                                        createConnection.start();
                                        DefaultHAService.this.addConnection(createConnection);
                                    } catch (Exception e) {
                                        DefaultHAService.log.error("new HAConnection exception", e);
                                        accept.close();
                                    }
                                }
                            } else {
                                DefaultHAService.log.warn("Unexpected ops in select " + selectionKey.readyOps());
                            }
                        }
                        selectedKeys.clear();
                    }
                } catch (Exception e2) {
                    DefaultHAService.log.error(getServiceName() + " service has exception.", e2);
                }
            }
            DefaultHAService.log.info(getServiceName() + " service end");
        }

        protected abstract HAConnection createConnection(SocketChannel socketChannel) throws IOException;
    }

    /* loaded from: input_file:org/apache/rocketmq/store/ha/DefaultHAService$DefaultAcceptSocketService.class */
    class DefaultAcceptSocketService extends AcceptSocketService {
        public DefaultAcceptSocketService(MessageStoreConfig messageStoreConfig) {
            super(messageStoreConfig);
        }

        @Override // org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService
        protected HAConnection createConnection(SocketChannel socketChannel) throws IOException {
            return new DefaultHAConnection(DefaultHAService.this, socketChannel);
        }

        public String getServiceName() {
            return DefaultHAService.this.defaultMessageStore.getBrokerConfig().isInBrokerContainer() ? DefaultHAService.this.defaultMessageStore.getBrokerConfig().getIdentifier() + AcceptSocketService.class.getSimpleName() : DefaultAcceptSocketService.class.getSimpleName();
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void init(DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
        this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
            this.haClient = new DefaultHAClient(this.defaultMessageStore);
        }
        this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void updateMasterAddress(String str) {
        if (this.haClient != null) {
            this.haClient.updateMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void updateHaMasterAddress(String str) {
        if (this.haClient != null) {
            this.haClient.updateHaMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void putRequest(CommitLog.GroupCommitRequest groupCommitRequest) {
        this.groupTransferService.putRequest(groupCommitRequest);
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean isSlaveOK(long j) {
        return (this.connectionCount.get() > 0) && j - this.push2SlaveMaxOffset.get() < ((long) this.defaultMessageStore.getMessageStoreConfig().getHaMaxGapNotInSync());
    }

    public void notifyTransferSome(long j) {
        long j2 = this.push2SlaveMaxOffset.get();
        while (true) {
            long j3 = j2;
            if (j <= j3) {
                return;
            }
            if (this.push2SlaveMaxOffset.compareAndSet(j3, j)) {
                this.groupTransferService.notifyTransferSome();
                return;
            }
            j2 = this.push2SlaveMaxOffset.get();
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public AtomicInteger getConnectionCount() {
        return this.connectionCount;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haConnectionStateNotificationService.start();
        if (this.haClient != null) {
            this.haClient.start();
        }
    }

    public void addConnection(HAConnection hAConnection) {
        synchronized (this.connectionList) {
            this.connectionList.add(hAConnection);
        }
    }

    public void removeConnection(HAConnection hAConnection) {
        this.haConnectionStateNotificationService.checkConnectionStateAndNotify(hAConnection);
        synchronized (this.connectionList) {
            this.connectionList.remove(hAConnection);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void shutdown() {
        if (this.haClient != null) {
            this.haClient.shutdown();
        }
        this.acceptSocketService.shutdown(true);
        destroyConnections();
        this.groupTransferService.shutdown();
        this.haConnectionStateNotificationService.shutdown();
    }

    public void destroyConnections() {
        synchronized (this.connectionList) {
            Iterator<HAConnection> it = this.connectionList.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            this.connectionList.clear();
        }
    }

    public DefaultMessageStore getDefaultMessageStore() {
        return this.defaultMessageStore;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public WaitNotifyObject getWaitNotifyObject() {
        return this.waitNotifyObject;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public AtomicLong getPush2SlaveMaxOffset() {
        return this.push2SlaveMaxOffset;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public int inSyncReplicasNums(long j) {
        int i = 1;
        Iterator<HAConnection> it = this.connectionList.iterator();
        while (it.hasNext()) {
            if (isInSyncSlave(j, it.next())) {
                i++;
            }
        }
        return i;
    }

    protected boolean isInSyncSlave(long j, HAConnection hAConnection) {
        return j - hAConnection.getSlaveAckOffset() < ((long) this.defaultMessageStore.getMessageStoreConfig().getHaMaxGapNotInSync());
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public void putGroupConnectionStateRequest(HAConnectionStateNotificationRequest hAConnectionStateNotificationRequest) {
        this.haConnectionStateNotificationService.setRequest(hAConnectionStateNotificationRequest);
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public List<HAConnection> getConnectionList() {
        return this.connectionList;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public HAClient getHAClient() {
        return this.haClient;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public HARuntimeInfo getRuntimeInfo(long j) {
        HARuntimeInfo hARuntimeInfo = new HARuntimeInfo();
        if (BrokerRole.SLAVE.equals(getDefaultMessageStore().getMessageStoreConfig().getBrokerRole())) {
            hARuntimeInfo.setMaster(false);
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterAddr(this.haClient.getHaMasterAddress());
            hARuntimeInfo.getHaClientRuntimeInfo().setMaxOffset(getDefaultMessageStore().getMaxPhyOffset());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastReadTimestamp(this.haClient.getLastReadTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastWriteTimestamp(this.haClient.getLastWriteTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setTransferredByteInSecond(this.haClient.getTransferredByteInSecond());
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterFlushOffset(this.defaultMessageStore.getMasterFlushedOffset());
        } else {
            hARuntimeInfo.setMaster(true);
            int i = 0;
            hARuntimeInfo.setMasterCommitLogMaxOffset(j);
            for (HAConnection hAConnection : this.connectionList) {
                HARuntimeInfo.HAConnectionRuntimeInfo hAConnectionRuntimeInfo = new HARuntimeInfo.HAConnectionRuntimeInfo();
                long slaveAckOffset = hAConnection.getSlaveAckOffset();
                hAConnectionRuntimeInfo.setSlaveAckOffset(slaveAckOffset);
                hAConnectionRuntimeInfo.setDiff(j - slaveAckOffset);
                hAConnectionRuntimeInfo.setAddr(hAConnection.getClientAddress().substring(1));
                hAConnectionRuntimeInfo.setTransferredByteInSecond(hAConnection.getTransferredByteInSecond());
                hAConnectionRuntimeInfo.setTransferFromWhere(hAConnection.getTransferFromWhere());
                boolean isInSyncSlave = isInSyncSlave(j, hAConnection);
                if (isInSyncSlave) {
                    i++;
                }
                hAConnectionRuntimeInfo.setInSync(isInSyncSlave);
                hARuntimeInfo.getHaConnectionInfo().add(hAConnectionRuntimeInfo);
            }
            hARuntimeInfo.setInSyncSlaveNums(i);
        }
        return hARuntimeInfo;
    }
}
