package org.apache.kylin.metadata.cachesync;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster.class */
public class Broadcaster {
    public static final String SYNC_ALL = "all";
    public static final String SYNC_PRJ_SCHEMA = "project_schema";
    public static final String SYNC_PRJ_DATA = "project_data";
    public static final String SYNC_PRJ_ACL = "project_acl";
    private KylinConfig config;
    private ExecutorService announceThreadPool;
    private SyncErrorHandler syncErrorHandler;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) Broadcaster.class);
    static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap();
    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque();
    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
    private AtomicLong counter = new AtomicLong();
    private ExecutorService announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory());

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster$BroadcastEvent.class */
    public static class BroadcastEvent {
        private int retryTime;
        private String targetNode;
        private String entity;
        private String event;
        private String cacheKey;

        public BroadcastEvent(String str, String str2, String str3) {
            this.entity = str;
            this.event = str2;
            this.cacheKey = str3;
        }

        public int getRetryTime() {
            return this.retryTime;
        }

        public void setRetryTime(int i) {
            this.retryTime = i;
        }

        public String getTargetNode() {
            return this.targetNode;
        }

        public void setTargetNode(String str) {
            this.targetNode = str;
        }

        public String getEntity() {
            return this.entity;
        }

        public String getEvent() {
            return this.event;
        }

        public String getCacheKey() {
            return this.cacheKey;
        }

        public int hashCode() {
            return (31 * ((31 * ((31 * 1) + (this.event == null ? 0 : this.event.hashCode()))) + (this.cacheKey == null ? 0 : this.cacheKey.hashCode()))) + (this.entity == null ? 0 : this.entity.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            BroadcastEvent broadcastEvent = (BroadcastEvent) obj;
            if (StringUtils.equals(this.event, broadcastEvent.event) && StringUtils.equals(this.cacheKey, broadcastEvent.cacheKey)) {
                return StringUtils.equals(this.entity, broadcastEvent.entity);
            }
            return false;
        }

        public String toString() {
            return Objects.toStringHelper(this).add("entity", this.entity).add("event", this.event).add("cacheKey", this.cacheKey).toString();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster$DefaultSyncErrorHandler.class */
    public static class DefaultSyncErrorHandler implements SyncErrorHandler {
        Broadcaster broadcaster;
        int maxRetryTimes;

        @Override // org.apache.kylin.metadata.cachesync.Broadcaster.SyncErrorHandler
        public void init(Broadcaster broadcaster) {
            this.maxRetryTimes = broadcaster.getConfig().getCacheSyncRetrys();
            this.broadcaster = broadcaster;
        }

        @Override // org.apache.kylin.metadata.cachesync.Broadcaster.SyncErrorHandler
        public void handleAnnounceError(String str, RestClient restClient, BroadcastEvent broadcastEvent) {
            int retryTime = broadcastEvent.getRetryTime() + 1;
            if (retryTime >= this.maxRetryTimes) {
                Broadcaster.logger.error("Announce broadcast event exceeds retry limit, abandon targetNode {} broadcastEvent {}", str, broadcastEvent);
                return;
            }
            broadcastEvent.setRetryTime(retryTime);
            broadcastEvent.setTargetNode(str);
            this.broadcaster.announce(broadcastEvent);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster$Event.class */
    public enum Event {
        CREATE("create"),
        UPDATE("update"),
        DROP("drop");

        private String text;

        Event(String str) {
            this.text = str;
        }

        public String getType() {
            return this.text;
        }

        public static Event getEvent(String str) {
            for (Event event : values()) {
                if (event.getType().equalsIgnoreCase(str)) {
                    return event;
                }
            }
            return null;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster$Listener.class */
    public static abstract class Listener {
        public void onClearAll(Broadcaster broadcaster) throws IOException {
        }

        public void onProjectSchemaChange(Broadcaster broadcaster, String str) throws IOException {
        }

        public void onProjectDataChange(Broadcaster broadcaster, String str) throws IOException {
        }

        public void onProjectQueryACLChange(Broadcaster broadcaster, String str) throws IOException {
        }

        public void onEntityChange(Broadcaster broadcaster, String str, Event event, String str2) throws IOException {
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.6.1.jar:org/apache/kylin/metadata/cachesync/Broadcaster$SyncErrorHandler.class */
    public interface SyncErrorHandler {
        void init(Broadcaster broadcaster);

        void handleAnnounceError(String str, RestClient restClient, BroadcastEvent broadcastEvent);
    }

    public static Broadcaster getInstance(KylinConfig kylinConfig) {
        return (Broadcaster) kylinConfig.getManager(Broadcaster.class);
    }

    static Broadcaster newInstance(KylinConfig kylinConfig) {
        return new Broadcaster(kylinConfig);
    }

    private Broadcaster(final KylinConfig kylinConfig) {
        this.config = kylinConfig;
        this.syncErrorHandler = getSyncErrorHandler(kylinConfig);
        String[] restServers = kylinConfig.getRestServers();
        if (restServers == null || restServers.length < 1) {
            logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
        }
        logger.debug("{} nodes in the cluster: {}", Integer.valueOf(restServers == null ? 0 : restServers.length), Arrays.toString(restServers));
        this.announceThreadPool = new ThreadPoolExecutor((restServers == null || restServers.length < 1) ? 1 : restServers.length, (restServers == null || restServers.length < 1) ? 10 : restServers.length * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DaemonThreadFactory());
        this.announceMainLoop.execute(new Runnable() { // from class: org.apache.kylin.metadata.cachesync.Broadcaster.1
            @Override // java.lang.Runnable
            public void run() {
                final HashMap newHashMap = Maps.newHashMap();
                while (!Broadcaster.this.announceThreadPool.isShutdown()) {
                    try {
                        final BroadcastEvent broadcastEvent = (BroadcastEvent) Broadcaster.this.broadcastEvents.takeFirst();
                        String[] restServers2 = kylinConfig.getRestServers();
                        Broadcaster.logger.debug("Servers in the cluster: {}", Arrays.toString(restServers2));
                        for (String str : restServers2) {
                            if (!newHashMap.containsKey(str)) {
                                newHashMap.put(str, new RestClient(str));
                            }
                        }
                        String targetNode = broadcastEvent.getTargetNode();
                        if (targetNode == null) {
                            targetNode = "all";
                        }
                        Broadcaster.logger.debug("Announcing new broadcast to {}: {}", targetNode, broadcastEvent);
                        for (final String str2 : restServers2) {
                            if (targetNode.equals("all") || targetNode.equals(str2)) {
                                Broadcaster.this.announceThreadPool.execute(new Runnable() { // from class: org.apache.kylin.metadata.cachesync.Broadcaster.1.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        RestClient restClient = (RestClient) newHashMap.get(str2);
                                        try {
                                            restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                        } catch (IOException e) {
                                            Broadcaster.logger.error("Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}", str2, broadcastEvent, e);
                                            Broadcaster.this.syncErrorHandler.handleAnnounceError(str2, restClient, broadcastEvent);
                                        }
                                    }
                                });
                            }
                        }
                    } catch (Exception e) {
                        Broadcaster.logger.error("error running wiping", (Throwable) e);
                    }
                }
            }
        });
    }

    private SyncErrorHandler getSyncErrorHandler(KylinConfig kylinConfig) {
        String cacheSyncErrorHandler = kylinConfig.getCacheSyncErrorHandler();
        if (StringUtils.isEmpty(cacheSyncErrorHandler)) {
            cacheSyncErrorHandler = DefaultSyncErrorHandler.class.getName();
        }
        return (SyncErrorHandler) ClassUtil.newInstance(cacheSyncErrorHandler);
    }

    public KylinConfig getConfig() {
        return this.config;
    }

    public void stopAnnounce() {
        this.announceThreadPool.shutdown();
        this.announceMainLoop.shutdown();
    }

    public void registerStaticListener(Listener listener, String... strArr) {
        doRegisterListener(staticListenerMap, listener, strArr);
    }

    public void registerListener(Listener listener, String... strArr) {
        doRegisterListener(this.listenerMap, listener, strArr);
    }

    private static void doRegisterListener(Map<String, List<Listener>> map, Listener listener, String... strArr) {
        synchronized (map) {
            List<Listener> list = map.get("all");
            if (list == null || !list.contains(listener)) {
                for (String str : strArr) {
                    if (!StringUtils.isBlank(str)) {
                        addListener(map, str, listener);
                    }
                }
                addListener(map, "all", listener);
                addListener(map, SYNC_PRJ_SCHEMA, listener);
                addListener(map, SYNC_PRJ_DATA, listener);
                addListener(map, SYNC_PRJ_ACL, listener);
            }
        }
    }

    private static void addListener(Map<String, List<Listener>> map, String str, Listener listener) {
        map.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(listener);
    }

    public void notifyClearAll() throws IOException {
        notifyListener("all", Event.UPDATE, "all");
    }

    public void notifyProjectSchemaUpdate(String str) throws IOException {
        notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, str);
    }

    public void notifyProjectDataUpdate(String str) throws IOException {
        notifyListener(SYNC_PRJ_DATA, Event.UPDATE, str);
    }

    public void notifyProjectACLUpdate(String str) throws IOException {
        notifyListener(SYNC_PRJ_ACL, Event.UPDATE, str);
    }

    public void notifyListener(String str, Event event, String str2) throws IOException {
        notifyListener(str, event, str2, true);
    }

    public void notifyNonStaticListener(String str, Event event, String str2) throws IOException {
        notifyListener(str, event, str2, false);
    }

    private void notifyListener(String str, Event event, String str2, boolean z) throws IOException {
        List<Listener> list;
        ArrayList newArrayList = Lists.newArrayList();
        List<Listener> list2 = this.listenerMap.get(str);
        if (list2 != null) {
            newArrayList.addAll(list2);
        }
        if (z && (list = staticListenerMap.get(str)) != null) {
            newArrayList.addAll(list);
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        logger.debug("Broadcasting {}, {}, {}", event, str, str2);
        boolean z2 = -1;
        switch (str.hashCode()) {
            case -939540892:
                if (str.equals(SYNC_PRJ_ACL)) {
                    z2 = 3;
                    break;
                }
                break;
            case 96673:
                if (str.equals("all")) {
                    z2 = false;
                    break;
                }
                break;
            case 939091216:
                if (str.equals(SYNC_PRJ_DATA)) {
                    z2 = 2;
                    break;
                }
                break;
            case 954460551:
                if (str.equals(SYNC_PRJ_SCHEMA)) {
                    z2 = true;
                    break;
                }
                break;
        }
        switch (z2) {
            case false:
                Iterator it = newArrayList.iterator();
                while (it.hasNext()) {
                    ((Listener) it.next()).onClearAll(this);
                }
                this.config.clearManagers();
                break;
            case true:
                ProjectManager.getInstance(this.config).clearL2Cache(str2);
                Iterator it2 = newArrayList.iterator();
                while (it2.hasNext()) {
                    ((Listener) it2.next()).onProjectSchemaChange(this, str2);
                }
                break;
            case true:
                ProjectManager.getInstance(this.config).clearL2Cache(str2);
                Iterator it3 = newArrayList.iterator();
                while (it3.hasNext()) {
                    ((Listener) it3.next()).onProjectDataChange(this, str2);
                }
                break;
            case true:
                ProjectManager.getInstance(this.config).clearL2Cache(str2);
                Iterator it4 = newArrayList.iterator();
                while (it4.hasNext()) {
                    ((Listener) it4.next()).onProjectQueryACLChange(this, str2);
                }
                break;
            default:
                Iterator it5 = newArrayList.iterator();
                while (it5.hasNext()) {
                    ((Listener) it5.next()).onEntityChange(this, str, event, str2);
                }
                break;
        }
        logger.debug("Done broadcasting {}, {}, {}", event, str, str2);
    }

    public void announce(String str, String str2, String str3) {
        announce(new BroadcastEvent(str, str2, str3));
    }

    public void announce(BroadcastEvent broadcastEvent) {
        if (this.broadcastEvents == null) {
            return;
        }
        try {
            this.counter.incrementAndGet();
            this.broadcastEvents.putLast(broadcastEvent);
        } catch (Exception e) {
            this.counter.decrementAndGet();
            logger.error("error putting BroadcastEvent", (Throwable) e);
        }
    }

    public long getCounterAndClear() {
        return this.counter.getAndSet(0L);
    }
}
