package org.apache.tubemq.server.common.offsetstorage;

import java.lang.Thread;
import java.util.Collection;
import org.apache.tubemq.server.broker.exception.OffsetStoreException;
import org.apache.tubemq.server.common.fileconfig.ZKConfig;
import org.apache.tubemq.server.common.offsetstorage.zookeeper.ZKUtil;
import org.apache.tubemq.server.common.offsetstorage.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.class */
public class ZkOffsetStorage implements OffsetStorage {
    private static final Logger logger = LoggerFactory.getLogger(ZkOffsetStorage.class);
    private final String tubeZkRoot;
    private final String consumerZkDir;
    private ZKConfig zkConfig;
    private ZooKeeperWatcher zkw;

    public ZkOffsetStorage(ZKConfig zKConfig) {
        this.zkConfig = zKConfig;
        this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
        this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
        try {
            this.zkw = new ZooKeeperWatcher(zKConfig);
        } catch (Throwable th) {
            logger.error(new StringBuilder(256).append("Failed to connect ZooKeeper server (").append(this.zkConfig.getZkServerAddr()).append(") !").toString(), th);
            System.exit(1);
        }
        logger.info("ZooKeeper Offset Storage initiated!");
    }

    @Override // org.apache.tubemq.server.common.offsetstorage.OffsetStorage
    public void close() {
        if (this.zkw != null) {
            logger.info("ZooKeeper Offset Storage closing .......");
            this.zkw.close();
            this.zkw = null;
            logger.info("ZooKeeper Offset Storage closed!");
        }
    }

    @Override // org.apache.tubemq.server.common.offsetstorage.OffsetStorage
    public void commitOffset(String str, Collection<OffsetStorageInfo> collection, boolean z) {
        if (this.zkw == null || collection == null || collection.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder(512);
        if (!z) {
            try {
                cfmOffset(sb, str, collection);
                return;
            } catch (OffsetStoreException e) {
                logger.error("Error when commit offsets to ZooKeeper", e);
                return;
            }
        }
        for (int i = 0; i < 10; i++) {
            try {
                cfmOffset(sb, str, collection);
                return;
            } catch (Exception e2) {
                logger.error("Error found when commit offsets to ZooKeeper with retry " + i, e2);
                try {
                    Thread.sleep(this.zkConfig.getZkSyncTimeMs());
                } catch (InterruptedException e3) {
                    logger.error("InterruptedException when commit offset to ZooKeeper with retry " + i, e3);
                    return;
                }
            }
        }
    }

    @Override // org.apache.tubemq.server.common.offsetstorage.OffsetStorage
    public OffsetStorageInfo loadOffset(String str, String str2, int i, int i2) {
        try {
            String readDataMaybeNull = ZKUtil.readDataMaybeNull(this.zkw, new StringBuilder(512).append(this.consumerZkDir).append("/").append(str).append("/offsets/").append(str2).append("/").append(i).append("-").append(i2).toString());
            if (readDataMaybeNull == null) {
                return null;
            }
            String[] split = readDataMaybeNull.split("-");
            return new OffsetStorageInfo(str2, i, i2, Long.parseLong(split[1]), Long.parseLong(split[0]), false);
        } catch (KeeperException e) {
            logger.error("KeeperException during load offsets from ZooKeeper", e);
            return null;
        }
    }

    private void cfmOffset(StringBuilder sb, String str, Collection<OffsetStorageInfo> collection) throws OffsetStoreException {
        sb.delete(0, sb.length());
        for (OffsetStorageInfo offsetStorageInfo : collection) {
            synchronized (offsetStorageInfo) {
                if (offsetStorageInfo.isModified()) {
                    long offset = offsetStorageInfo.getOffset();
                    long messageId = offsetStorageInfo.getMessageId();
                    offsetStorageInfo.setModified(false);
                    String sb2 = sb.append(this.consumerZkDir).append("/").append(str).append("/offsets/").append(offsetStorageInfo.getTopic()).append("/").append(offsetStorageInfo.getBrokerId()).append("-").append(offsetStorageInfo.getPartitionId()).toString();
                    sb.delete(0, sb.length());
                    String sb3 = sb.append(messageId).append("-").append(offset).toString();
                    sb.delete(0, sb.length());
                    try {
                        ZKUtil.updatePersistentPath(this.zkw, sb2, sb3);
                        if (logger.isDebugEnabled()) {
                            logger.debug(sb.append("Committed offset, path=").append(sb2).append(", data=").append(sb3).toString());
                            sb.delete(0, sb.length());
                        }
                    } catch (Throwable th) {
                        logger.error("Exception during commit offsets to ZooKeeper", th);
                        throw new OffsetStoreException(th);
                    }
                }
            }
        }
    }

    private String normalize(String str) {
        return str.startsWith("/") ? removeLastSlash(str) : "/" + removeLastSlash(str);
    }

    private String removeLastSlash(String str) {
        return str.endsWith("/") ? str.substring(0, str.lastIndexOf("/")) : str;
    }

    static {
        if (Thread.getDefaultUncaughtExceptionHandler() == null) {
            Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.tubemq.server.common.offsetstorage.ZkOffsetStorage.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    if ((th instanceof IllegalStateException) && th.getMessage().contains("Shutdown in progress")) {
                        return;
                    }
                    ZkOffsetStorage.logger.warn("Thread terminated with exception: " + thread.getName(), th);
                }
            });
        }
    }
}
