package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.store.DefaultMessageStore;

/* loaded from: input_file:org/apache/rocketmq/store/ha/DefaultHAClient.class */
public class DefaultHAClient extends ServiceThread implements HAClient {
    public static final int REPORT_HEADER_SIZE = 8;
    private static final Logger log = LoggerFactory.getLogger("RocketmqStore");
    private static final int READ_MAX_BUFFER_SIZE = 4194304;
    private SocketChannel socketChannel;
    private DefaultMessageStore defaultMessageStore;
    private FlowMonitor flowMonitor;
    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
    private final AtomicReference<String> masterAddress = new AtomicReference<>();
    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
    private long lastReadTimestamp = System.currentTimeMillis();
    private long lastWriteTimestamp = System.currentTimeMillis();
    private long currentReportedOffset = 0;
    private int dispatchPosition = 0;
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    private volatile HAConnectionState currentState = HAConnectionState.READY;
    private Selector selector = NetworkUtil.openSelector();

    public DefaultHAClient(DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        this.flowMonitor = new FlowMonitor(defaultMessageStore.getMessageStoreConfig());
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void updateHaMasterAddress(String str) {
        String str2 = this.masterHaAddress.get();
        if (this.masterHaAddress.compareAndSet(str2, str)) {
            log.info("update master ha address, OLD: " + str2 + " NEW: " + str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void updateMasterAddress(String str) {
        String str2 = this.masterAddress.get();
        if (this.masterAddress.compareAndSet(str2, str)) {
            log.info("update master address, OLD: " + str2 + " NEW: " + str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public String getHaMasterAddress() {
        return this.masterHaAddress.get();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public String getMasterAddress() {
        return this.masterAddress.get();
    }

    private boolean isTimeToReportOffset() {
        return this.defaultMessageStore.now() - this.lastWriteTimestamp > ((long) this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval());
    }

    private boolean reportSlaveMaxOffset(long j) {
        this.reportOffset.position(0);
        this.reportOffset.limit(8);
        this.reportOffset.putLong(j);
        this.reportOffset.position(0);
        this.reportOffset.limit(8);
        for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
            try {
                this.socketChannel.write(this.reportOffset);
            } catch (IOException e) {
                log.error(getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                return false;
            }
        }
        this.lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();
        return !this.reportOffset.hasRemaining();
    }

    private void reallocateByteBuffer() {
        int i = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
        if (i > 0) {
            this.byteBufferRead.position(this.dispatchPosition);
            this.byteBufferBackup.position(0);
            this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
            this.byteBufferBackup.put(this.byteBufferRead);
        }
        swapByteBuffer();
        this.byteBufferRead.position(i);
        this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
        this.dispatchPosition = 0;
    }

    private void swapByteBuffer() {
        ByteBuffer byteBuffer = this.byteBufferRead;
        this.byteBufferRead = this.byteBufferBackup;
        this.byteBufferBackup = byteBuffer;
    }

    private boolean processReadEvent() {
        int i = 0;
        while (this.byteBufferRead.hasRemaining()) {
            try {
                int read = this.socketChannel.read(this.byteBufferRead);
                if (read > 0) {
                    this.flowMonitor.addByteCountTransferred(read);
                    i = 0;
                    if (!dispatchReadRequest()) {
                        log.error("HAClient, dispatchReadRequest error");
                        return false;
                    }
                    this.lastReadTimestamp = System.currentTimeMillis();
                } else {
                    if (read != 0) {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                    i++;
                    if (i >= 3) {
                        return true;
                    }
                }
            } catch (IOException e) {
                log.info("HAClient, processReadEvent read socket exception", e);
                return false;
            }
        }
        return true;
    }

    private boolean dispatchReadRequest() {
        int position = this.byteBufferRead.position();
        do {
            int position2 = this.byteBufferRead.position() - this.dispatchPosition;
            if (position2 >= 12) {
                long j = this.byteBufferRead.getLong(this.dispatchPosition);
                int i = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                long maxPhyOffset = this.defaultMessageStore.getMaxPhyOffset();
                if (maxPhyOffset != 0 && maxPhyOffset != j) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + maxPhyOffset + " MASTER: " + j);
                    return false;
                }
                if (position2 >= 12 + i) {
                    this.defaultMessageStore.appendToCommitLog(j, this.byteBufferRead.array(), this.dispatchPosition + 12, i);
                    this.byteBufferRead.position(position);
                    this.dispatchPosition += 12 + i;
                }
            }
            if (this.byteBufferRead.hasRemaining()) {
                return true;
            }
            reallocateByteBuffer();
            return true;
        } while (reportSlaveMaxOffsetPlus());
        return false;
    }

    private boolean reportSlaveMaxOffsetPlus() {
        boolean z = true;
        long maxPhyOffset = this.defaultMessageStore.getMaxPhyOffset();
        if (maxPhyOffset > this.currentReportedOffset) {
            this.currentReportedOffset = maxPhyOffset;
            z = reportSlaveMaxOffset(this.currentReportedOffset);
            if (!z) {
                closeMaster();
                log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
            }
        }
        return z;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void changeCurrentState(HAConnectionState hAConnectionState) {
        log.info("change state to {}", hAConnectionState);
        this.currentState = hAConnectionState;
    }

    public boolean connectMaster() throws ClosedChannelException {
        if (null == this.socketChannel) {
            String str = this.masterHaAddress.get();
            if (str != null) {
                this.socketChannel = RemotingHelper.connect(NetworkUtil.string2SocketAddress(str));
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, 1);
                    log.info("HAClient connect to master {}", str);
                    changeCurrentState(HAConnectionState.TRANSFER);
                }
            }
            this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();
            this.lastReadTimestamp = System.currentTimeMillis();
        }
        return this.socketChannel != null;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void closeMaster() {
        if (null != this.socketChannel) {
            try {
                SelectionKey keyFor = this.socketChannel.keyFor(this.selector);
                if (keyFor != null) {
                    keyFor.cancel();
                }
                this.socketChannel.close();
                this.socketChannel = null;
                log.info("HAClient close connection with master {}", this.masterHaAddress.get());
                changeCurrentState(HAConnectionState.READY);
            } catch (IOException e) {
                log.warn("closeMaster exception. ", e);
            }
            this.lastReadTimestamp = 0L;
            this.dispatchPosition = 0;
            this.byteBufferBackup.position(0);
            this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
            this.byteBufferRead.position(0);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:6:0x0037. Please report as an issue. */
    public void run() {
        log.info(getServiceName() + " service started");
        this.flowMonitor.start();
        while (!isStopped()) {
            try {
            } catch (Exception e) {
                log.warn(getServiceName() + " service has exception. ", e);
                closeMasterAndWait();
            }
            switch (this.currentState) {
                case SHUTDOWN:
                    this.flowMonitor.shutdown(true);
                    return;
                case READY:
                    if (!connectMaster()) {
                        log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());
                        waitForRunning(5000L);
                    }
                case TRANSFER:
                    if (transferFromMaster()) {
                        long now = this.defaultMessageStore.now() - this.lastReadTimestamp;
                        if (now > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
                            log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress + "] expired, " + now);
                            closeMaster();
                            log.warn("AutoRecoverHAClient, master not response some time, so close connection");
                        }
                    } else {
                        closeMasterAndWait();
                    }
                default:
                    waitForRunning(2000L);
            }
        }
        this.flowMonitor.shutdown(true);
        log.info(getServiceName() + " service end");
    }

    private boolean transferFromMaster() throws IOException {
        if (isTimeToReportOffset()) {
            log.info("Slave report current offset {}", Long.valueOf(this.currentReportedOffset));
            if (!reportSlaveMaxOffset(this.currentReportedOffset)) {
                return false;
            }
        }
        this.selector.select(1000L);
        if (processReadEvent()) {
            return reportSlaveMaxOffsetPlus();
        }
        return false;
    }

    public void closeMasterAndWait() {
        closeMaster();
        waitForRunning(5000L);
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getLastWriteTimestamp() {
        return this.lastWriteTimestamp;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getLastReadTimestamp() {
        return this.lastReadTimestamp;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public HAConnectionState getCurrentState() {
        return this.currentState;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getTransferredByteInSecond() {
        return this.flowMonitor.getTransferredByteInSecond();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void shutdown() {
        changeCurrentState(HAConnectionState.SHUTDOWN);
        this.flowMonitor.shutdown();
        super.shutdown();
        closeMaster();
        try {
            this.selector.close();
        } catch (IOException e) {
            log.warn("Close the selector of AutoRecoverHAClient error, ", e);
        }
    }

    public String getServiceName() {
        return (this.defaultMessageStore == null || !this.defaultMessageStore.getBrokerConfig().isInBrokerContainer()) ? DefaultHAClient.class.getSimpleName() : this.defaultMessageStore.getBrokerIdentity().getIdentifier() + DefaultHAClient.class.getSimpleName();
    }
}
