package org.apache.pinot.broker.broker.helix;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.common.messages.TimeboundaryRefreshMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/broker/helix/TimeboundaryRefreshMessageHandlerFactory.class */
public class TimeboundaryRefreshMessageHandlerFactory implements MessageHandlerFactory {
    private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
    private boolean shuttingDown;
    private static final Logger LOGGER = LoggerFactory.getLogger(TimeboundaryRefreshMessageHandlerFactory.class);
    private static ConcurrentHashMap<String, Boolean> _tablesToRefreshmap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/pinot/broker/broker/helix/TimeboundaryRefreshMessageHandlerFactory$TimeboundaryRefreshMessageExecutor.class */
    private class TimeboundaryRefreshMessageExecutor implements Runnable {
        private long _sleepTimeInMilliseconds;
        private final Logger _logger = LoggerFactory.getLogger(TimeboundaryRefreshMessageExecutor.class);

        public TimeboundaryRefreshMessageExecutor(long j) {
            this._sleepTimeInMilliseconds = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!TimeboundaryRefreshMessageHandlerFactory.this.shuttingDown) {
                try {
                    Iterator it = TimeboundaryRefreshMessageHandlerFactory._tablesToRefreshmap.keySet().iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        this._logger.info("Update time boundary info for table {} ", str);
                        TimeboundaryRefreshMessageHandlerFactory.this._helixExternalViewBasedRouting.updateTimeBoundary(str);
                        it.remove();
                    }
                    Thread.sleep(this._sleepTimeInMilliseconds);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this._logger.info("TimeboundaryRefreshMessageExecutor thread has been shutdown.");
        }
    }

    /* loaded from: input_file:org/apache/pinot/broker/broker/helix/TimeboundaryRefreshMessageHandlerFactory$TimeboundaryRefreshMessageHandler.class */
    private class TimeboundaryRefreshMessageHandler extends MessageHandler {
        private final String _tableNameWithType;
        private final Logger _logger;

        public TimeboundaryRefreshMessageHandler(TimeboundaryRefreshMessage timeboundaryRefreshMessage, NotificationContext notificationContext) {
            super(timeboundaryRefreshMessage, notificationContext);
            this._tableNameWithType = timeboundaryRefreshMessage.getPartitionName();
            this._logger = LoggerFactory.getLogger(this._tableNameWithType + "-" + TimeboundaryRefreshMessageHandler.class);
        }

        public HelixTaskResult handleMessage() {
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            TimeboundaryRefreshMessageHandlerFactory._tablesToRefreshmap.put(this._tableNameWithType, Boolean.TRUE);
            helixTaskResult.setSuccess(true);
            return helixTaskResult;
        }

        public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            this._logger.error("onError: {}, {}", new Object[]{errorType, errorCode, exc});
        }
    }

    public TimeboundaryRefreshMessageHandlerFactory(HelixExternalViewBasedRouting helixExternalViewBasedRouting, long j) {
        this._helixExternalViewBasedRouting = helixExternalViewBasedRouting;
        new Thread(new TimeboundaryRefreshMessageExecutor(j)).start();
        this.shuttingDown = false;
    }

    public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
        String msgSubType = message.getMsgSubType();
        boolean z = -1;
        switch (msgSubType.hashCode()) {
            case 1832362712:
                if (msgSubType.equals("REFRESH_TIME_BOUNDARY")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                LOGGER.info("time refresh msg received {} for table {}", message.getPartitionName());
                return new TimeboundaryRefreshMessageHandler(new TimeboundaryRefreshMessage(message), notificationContext);
            default:
                throw new UnsupportedOperationException("Unsupported user defined message sub type: " + msgSubType);
        }
    }

    public String getMessageType() {
        return Message.MessageType.USER_DEFINE_MSG.toString();
    }

    public void reset() {
        LOGGER.info("Reset called");
    }

    public void shutdown() {
        this.shuttingDown = true;
    }
}
