package org.apache.rocketmq.broker.filtersrv;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-broker-4.9.0.jar:org/apache/rocketmq/broker/filtersrv/FilterServerManager.class */
public class FilterServerManager {
    public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;
    private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable = new ConcurrentHashMap(16);
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-broker-4.9.0.jar:org/apache/rocketmq/broker/filtersrv/FilterServerManager$FilterServerInfo.class */
    public static class FilterServerInfo {
        private String filterServerAddr;
        private long lastUpdateTimestamp;

        FilterServerInfo() {
        }

        public String getFilterServerAddr() {
            return this.filterServerAddr;
        }

        public void setFilterServerAddr(String str) {
            this.filterServerAddr = str;
        }

        public long getLastUpdateTimestamp() {
            return this.lastUpdateTimestamp;
        }

        public void setLastUpdateTimestamp(long j) {
            this.lastUpdateTimestamp = j;
        }
    }

    public FilterServerManager(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.filtersrv.FilterServerManager.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    FilterServerManager.this.createFilterServer();
                } catch (Exception e) {
                    FilterServerManager.log.error("", (Throwable) e);
                }
            }
        }, 5000L, 30000L, TimeUnit.MILLISECONDS);
    }

    public void createFilterServer() {
        int filterServerNums = this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
        String buildStartCommand = buildStartCommand();
        for (int i = 0; i < filterServerNums; i++) {
            FilterServerUtil.callShell(buildStartCommand, log);
        }
    }

    private String buildStartCommand() {
        String format = BrokerStartup.configFile != null ? String.format("-c %s", BrokerStartup.configFile) : "";
        if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
            format = format + String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
        }
        return RemotingUtil.isWindowsPlatform() ? String.format("start /b %s\\bin\\mqfiltersrv.exe %s", this.brokerController.getBrokerConfig().getRocketmqHome(), format) : String.format("sh %s/bin/startfsrv.sh %s", this.brokerController.getBrokerConfig().getRocketmqHome(), format);
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
    }

    public void registerFilterServer(Channel channel, String str) {
        FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
        if (filterServerInfo != null) {
            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
            return;
        }
        FilterServerInfo filterServerInfo2 = new FilterServerInfo();
        filterServerInfo2.setFilterServerAddr(str);
        filterServerInfo2.setLastUpdateTimestamp(System.currentTimeMillis());
        this.filterServerTable.put(channel, filterServerInfo2);
        log.info("Receive a New Filter Server<{}>", str);
    }

    public void scanNotActiveChannel() {
        Iterator<Map.Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Channel, FilterServerInfo> next = it.next();
            long lastUpdateTimestamp = next.getValue().getLastUpdateTimestamp();
            Channel key = next.getKey();
            if (System.currentTimeMillis() - lastUpdateTimestamp > 30000) {
                log.info("The Filter Server<{}> expired, remove it", next.getKey());
                it.remove();
                RemotingUtil.closeChannel(key);
            }
        }
    }

    public void doChannelCloseEvent(String str, Channel channel) {
        FilterServerInfo remove = this.filterServerTable.remove(channel);
        if (remove != null) {
            log.warn("The Filter Server<{}> connection<{}> closed, remove it", remove.getFilterServerAddr(), str);
        }
    }

    public List<String> buildNewFilterServerList() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue().getFilterServerAddr());
        }
        return arrayList;
    }
}
