/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.metadata.cachesync;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Broadcaster {
    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
    public static final String SYNC_ALL = "all";
    public static final String SYNC_PRJ_SCHEMA = "project_schema";
    public static final String SYNC_PRJ_DATA = "project_data";
    public static final String SYNC_PRJ_ACL = "project_acl";
    static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap();
    private KylinConfig config;
    private ExecutorService announceMainLoop;
    private ExecutorService announceThreadPool;
    private SyncErrorHandler syncErrorHandler;
    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<BroadcastEvent>();
    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
    private AtomicLong counter = new AtomicLong();

    public static Broadcaster getInstance(KylinConfig config) {
        return config.getManager(Broadcaster.class);
    }

    static Broadcaster newInstance(KylinConfig config) {
        return new Broadcaster(config);
    }

    private Broadcaster(final KylinConfig config) {
        this.config = config;
        this.syncErrorHandler = this.getSyncErrorHandler(config);
        this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory());
        this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
        Object[] nodes = config.getRestServers();
        if (nodes == null || nodes.length < 1) {
            logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
        }
        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
        this.announceMainLoop.execute(new Runnable(){

            @Override
            public void run() {
                final HashMap restClientMap = Maps.newHashMap();
                while (!Broadcaster.this.announceThreadPool.isShutdown()) {
                    try {
                        BroadcastEvent broadcastEvent = (BroadcastEvent)Broadcaster.this.broadcastEvents.takeFirst();
                        Object[] restServers = config.getRestServers();
                        logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
                        for (Object node : restServers) {
                            if (restClientMap.containsKey(node)) continue;
                            restClientMap.put(node, new RestClient((String)node));
                        }
                        String toWhere = broadcastEvent.getTargetNode();
                        if (toWhere == null) {
                            toWhere = Broadcaster.SYNC_ALL;
                        }
                        logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent);
                        for (Object node : restServers) {
                            if (!toWhere.equals(Broadcaster.SYNC_ALL) && !toWhere.equals(node)) continue;
                            Broadcaster.this.announceThreadPool.execute(new Runnable((String)node, broadcastEvent){
                                final /* synthetic */ String val$node;
                                final /* synthetic */ BroadcastEvent val$broadcastEvent;
                                {
                                    this.val$node = string;
                                    this.val$broadcastEvent = broadcastEvent;
                                }

                                @Override
                                public void run() {
                                    RestClient restClient = (RestClient)restClientMap.get(this.val$node);
                                    try {
                                        restClient.wipeCache(this.val$broadcastEvent.getEntity(), this.val$broadcastEvent.getEvent(), this.val$broadcastEvent.getCacheKey());
                                    }
                                    catch (IOException e) {
                                        logger.error("Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}", new Object[]{this.val$node, this.val$broadcastEvent, e});
                                        Broadcaster.this.syncErrorHandler.handleAnnounceError(this.val$node, restClient, this.val$broadcastEvent);
                                    }
                                }
                            });
                        }
                    }
                    catch (Exception e) {
                        logger.error("error running wiping", (Throwable)e);
                    }
                }
            }
        });
    }

    private SyncErrorHandler getSyncErrorHandler(KylinConfig config) {
        String clzName = config.getCacheSyncErrorHandler();
        if (StringUtils.isEmpty((String)clzName)) {
            clzName = DefaultSyncErrorHandler.class.getName();
        }
        return (SyncErrorHandler)ClassUtil.newInstance(clzName);
    }

    public KylinConfig getConfig() {
        return this.config;
    }

    public void stopAnnounce() {
        this.announceThreadPool.shutdown();
        this.announceMainLoop.shutdown();
    }

    public void registerStaticListener(Listener listener, String ... entities) {
        Broadcaster.doRegisterListener(staticListenerMap, listener, entities);
    }

    public void registerListener(Listener listener, String ... entities) {
        Broadcaster.doRegisterListener(this.listenerMap, listener, entities);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void doRegisterListener(Map<String, List<Listener>> lmap, Listener listener, String ... entities) {
        Map<String, List<Listener>> map = lmap;
        synchronized (map) {
            List<Listener> all = lmap.get(SYNC_ALL);
            if (all != null && all.contains(listener)) {
                return;
            }
            for (String entity : entities) {
                if (StringUtils.isBlank((String)entity)) continue;
                Broadcaster.addListener(lmap, entity, listener);
            }
            Broadcaster.addListener(lmap, SYNC_ALL, listener);
            Broadcaster.addListener(lmap, SYNC_PRJ_SCHEMA, listener);
            Broadcaster.addListener(lmap, SYNC_PRJ_DATA, listener);
            Broadcaster.addListener(lmap, SYNC_PRJ_ACL, listener);
        }
    }

    private static void addListener(Map<String, List<Listener>> lmap, String entity, Listener listener) {
        List<Listener> list = lmap.get(entity);
        if (list == null) {
            list = new ArrayList<Listener>();
            lmap.put(entity, list);
        }
        list.add(listener);
    }

    public void notifyClearAll() throws IOException {
        this.notifyListener(SYNC_ALL, Event.UPDATE, SYNC_ALL);
    }

    public void notifyProjectSchemaUpdate(String project) throws IOException {
        this.notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, project);
    }

    public void notifyProjectDataUpdate(String project) throws IOException {
        this.notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
    }

    public void notifyProjectACLUpdate(String project) throws IOException {
        this.notifyListener(SYNC_PRJ_ACL, Event.UPDATE, project);
    }

    public void notifyListener(String entity, Event event, String cacheKey) throws IOException {
        this.notifyListener(entity, event, cacheKey, true);
    }

    public void notifyNonStaticListener(String entity, Event event, String cacheKey) throws IOException {
        this.notifyListener(entity, event, cacheKey, false);
    }

    private void notifyListener(String entity, Event event, String cacheKey, boolean includeStatic) throws IOException {
        List<Listener> l2;
        ArrayList list = Lists.newArrayList();
        List<Listener> l1 = this.listenerMap.get(entity);
        if (l1 != null) {
            list.addAll(l1);
        }
        if (includeStatic && (l2 = staticListenerMap.get(entity)) != null) {
            list.addAll(l2);
        }
        if (list.isEmpty()) {
            return;
        }
        logger.debug("Broadcasting " + (Object)((Object)event) + ", " + entity + ", " + cacheKey);
        switch (entity) {
            case "all": {
                for (Listener l : list) {
                    l.onClearAll(this);
                }
                this.config.clearManagers();
                break;
            }
            case "project_schema": {
                ProjectManager.getInstance(this.config).clearL2Cache(cacheKey);
                for (Listener l : list) {
                    l.onProjectSchemaChange(this, cacheKey);
                }
                break;
            }
            case "project_data": {
                ProjectManager.getInstance(this.config).clearL2Cache(cacheKey);
                for (Listener l : list) {
                    l.onProjectDataChange(this, cacheKey);
                }
                break;
            }
            case "project_acl": {
                ProjectManager.getInstance(this.config).clearL2Cache(cacheKey);
                for (Listener l : list) {
                    l.onProjectQueryACLChange(this, cacheKey);
                }
                break;
            }
            default: {
                for (Listener l : list) {
                    l.onEntityChange(this, entity, event, cacheKey);
                }
            }
        }
        logger.debug("Done broadcasting " + (Object)((Object)event) + ", " + entity + ", " + cacheKey);
    }

    public void announce(String entity, String event, String key) {
        this.announce(new BroadcastEvent(entity, event, key));
    }

    public void announce(BroadcastEvent event) {
        if (this.broadcastEvents == null) {
            return;
        }
        try {
            this.counter.incrementAndGet();
            this.broadcastEvents.putLast(event);
        }
        catch (Exception e) {
            this.counter.decrementAndGet();
            logger.error("error putting BroadcastEvent", (Throwable)e);
        }
    }

    public long getCounterAndClear() {
        return this.counter.getAndSet(0L);
    }

    public static class BroadcastEvent {
        private int retryTime;
        private String targetNode;
        private String entity;
        private String event;
        private String cacheKey;

        public BroadcastEvent(String entity, String event, String cacheKey) {
            this.entity = entity;
            this.event = event;
            this.cacheKey = cacheKey;
        }

        public int getRetryTime() {
            return this.retryTime;
        }

        public void setRetryTime(int retryTime) {
            this.retryTime = retryTime;
        }

        public String getTargetNode() {
            return this.targetNode;
        }

        public void setTargetNode(String targetNode) {
            this.targetNode = targetNode;
        }

        public String getEntity() {
            return this.entity;
        }

        public String getEvent() {
            return this.event;
        }

        public String getCacheKey() {
            return this.cacheKey;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.event == null ? 0 : this.event.hashCode());
            result = 31 * result + (this.cacheKey == null ? 0 : this.cacheKey.hashCode());
            result = 31 * result + (this.entity == null ? 0 : this.entity.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this == obj) {
                return true;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            BroadcastEvent other = (BroadcastEvent)obj;
            if (!StringUtils.equals((String)this.event, (String)other.event)) {
                return false;
            }
            if (!StringUtils.equals((String)this.cacheKey, (String)other.cacheKey)) {
                return false;
            }
            return StringUtils.equals((String)this.entity, (String)other.entity);
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("entity", (Object)this.entity).add("event", (Object)this.event).add("cacheKey", (Object)this.cacheKey).toString();
        }
    }

    public static abstract class Listener {
        public void onClearAll(Broadcaster broadcaster) throws IOException {
        }

        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
        }

        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
        }

        public void onProjectQueryACLChange(Broadcaster broadcaster, String project) throws IOException {
        }

        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
        }
    }

    public static enum Event {
        CREATE("create"),
        UPDATE("update"),
        DROP("drop");

        private String text;

        private Event(String text) {
            this.text = text;
        }

        public String getType() {
            return this.text;
        }

        public static Event getEvent(String event) {
            for (Event one : Event.values()) {
                if (!one.getType().equalsIgnoreCase(event)) continue;
                return one;
            }
            return null;
        }
    }

    public static interface SyncErrorHandler {
        public void init(Broadcaster var1);

        public void handleAnnounceError(String var1, RestClient var2, BroadcastEvent var3);
    }

    public static class DefaultSyncErrorHandler
    implements SyncErrorHandler {
        Broadcaster broadcaster;
        int maxRetryTimes;

        @Override
        public void init(Broadcaster broadcaster) {
            this.maxRetryTimes = broadcaster.getConfig().getCacheSyncRetrys();
            this.broadcaster = broadcaster;
        }

        @Override
        public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) {
            int retry = event.getRetryTime() + 1;
            if (retry < this.maxRetryTimes) {
                event.setRetryTime(retry);
                event.setTargetNode(targetNode);
                this.broadcaster.announce(event);
            } else {
                logger.error("Announce broadcast event exceeds retry limit, abandon targetNode {} broadcastEvent {}", (Object)targetNode, (Object)event);
            }
        }
    }
}

