package org.apache.kylin.metadata.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.0.0.jar:org/apache/kylin/metadata/streaming/StreamingManager.class */
public class StreamingManager {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamingManager.class);
    private static final ConcurrentMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap();
    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer(StreamingConfig.class);
    private KylinConfig config;
    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metadata-2.0.0.jar:org/apache/kylin/metadata/streaming/StreamingManager$StreamingSyncListener.class */
    private class StreamingSyncListener extends Broadcaster.Listener {
        private StreamingSyncListener() {
        }

        @Override // org.apache.kylin.metadata.cachesync.Broadcaster.Listener
        public void onClearAll(Broadcaster broadcaster) throws IOException {
            StreamingManager.clearCache();
        }

        @Override // org.apache.kylin.metadata.cachesync.Broadcaster.Listener
        public void onEntityChange(Broadcaster broadcaster, String str, Broadcaster.Event event, String str2) throws IOException {
            if (event == Broadcaster.Event.DROP) {
                StreamingManager.this.removeStreamingLocal(str2);
            } else {
                StreamingManager.this.reloadStreamingConfigLocal(str2);
            }
        }
    }

    public static void clearCache() {
        CACHE.clear();
    }

    private StreamingManager(KylinConfig kylinConfig) throws IOException {
        this.config = kylinConfig;
        this.streamingMap = new CaseInsensitiveStringCache<>(kylinConfig, "streaming");
        reloadAllStreaming();
        Broadcaster.getInstance(kylinConfig).registerListener(new StreamingSyncListener(), "streaming");
    }

    private ResourceStore getStore() {
        return ResourceStore.getStore(this.config);
    }

    public static StreamingManager getInstance(KylinConfig kylinConfig) {
        StreamingManager streamingManager = CACHE.get(kylinConfig);
        if (streamingManager != null) {
            return streamingManager;
        }
        synchronized (StreamingManager.class) {
            StreamingManager streamingManager2 = CACHE.get(kylinConfig);
            if (streamingManager2 != null) {
                return streamingManager2;
            }
            try {
                StreamingManager streamingManager3 = new StreamingManager(kylinConfig);
                CACHE.put(kylinConfig, streamingManager3);
                if (CACHE.size() > 1) {
                    logger.warn("More than one singleton exist");
                }
                return streamingManager3;
            } catch (IOException e) {
                throw new IllegalStateException("Failed to init StreamingManager from " + kylinConfig, e);
            }
        }
    }

    private static String formatStreamingConfigPath(String str) {
        return "/streaming/" + str + MetadataConstants.FILE_SURFIX;
    }

    private static String formatStreamingOutputPath(String str, int i) {
        return "/streaming_output/" + str + "_" + i + MetadataConstants.FILE_SURFIX;
    }

    private static String formatStreamingOutputPath(String str, List<Integer> list) {
        return "/streaming_output/" + str + "_" + StringUtils.join(list, "_") + MetadataConstants.FILE_SURFIX;
    }

    public StreamingConfig getStreamingConfig(String str) {
        return this.streamingMap.get(str);
    }

    public List<StreamingConfig> listAllStreaming() {
        return new ArrayList(this.streamingMap.values());
    }

    public StreamingConfig reloadStreamingConfigLocal(String str) throws IOException {
        StreamingConfig loadStreamingConfigAt = loadStreamingConfigAt(StreamingConfig.concatResourcePath(str));
        this.streamingMap.putLocal(loadStreamingConfigAt.getName(), (String) loadStreamingConfigAt);
        return loadStreamingConfigAt;
    }

    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
        getStore().deleteResource(streamingConfig.getResourcePath());
        this.streamingMap.remove(streamingConfig.getName());
    }

    public StreamingConfig getConfig(String str) {
        return this.streamingMap.get(str.toUpperCase());
    }

    public void removeStreamingLocal(String str) {
        this.streamingMap.removeLocal(str);
    }

    public StreamingConfig updateStreamingConfig(StreamingConfig streamingConfig) throws IOException {
        if (streamingConfig.getUuid() == null || streamingConfig.getName() == null) {
            throw new IllegalArgumentException("SteamingConfig Illegal.");
        }
        String name = streamingConfig.getName();
        if (!this.streamingMap.containsKey(name)) {
            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
        }
        String resourcePath = streamingConfig.getResourcePath();
        getStore().putResource(resourcePath, (String) streamingConfig, (Serializer<String>) STREAMING_SERIALIZER);
        StreamingConfig loadStreamingConfigAt = loadStreamingConfigAt(resourcePath);
        this.streamingMap.put(loadStreamingConfigAt.getName(), (String) streamingConfig);
        return loadStreamingConfigAt;
    }

    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
            throw new IllegalArgumentException();
        }
        if (this.streamingMap.containsKey(streamingConfig.getName())) {
            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
        }
        getStore().putResource(StreamingConfig.concatResourcePath(streamingConfig.getName()), (String) streamingConfig, (Serializer<String>) StreamingConfig.SERIALIZER);
        this.streamingMap.put(streamingConfig.getName(), (String) streamingConfig);
        return streamingConfig;
    }

    private StreamingConfig loadStreamingConfigAt(String str) throws IOException {
        StreamingConfig streamingConfig = (StreamingConfig) getStore().getResource(str, StreamingConfig.class, STREAMING_SERIALIZER);
        if (StringUtils.isBlank(streamingConfig.getName())) {
            throw new IllegalStateException("StreamingConfig name must not be blank");
        }
        return streamingConfig;
    }

    private void reloadAllStreaming() throws IOException {
        ResourceStore store = getStore();
        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
        this.streamingMap.clear();
        for (String str : store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX)) {
            try {
                StreamingConfig loadStreamingConfigAt = loadStreamingConfigAt(str);
                if (!str.equals(loadStreamingConfigAt.getResourcePath())) {
                    logger.error("Skip suspicious desc at " + str + ", " + loadStreamingConfigAt + " should be at " + loadStreamingConfigAt.getResourcePath());
                } else if (this.streamingMap.containsKey(loadStreamingConfigAt.getName())) {
                    logger.error("Dup StreamingConfig name '" + loadStreamingConfigAt.getName() + "' on path " + str);
                } else {
                    this.streamingMap.putLocal(loadStreamingConfigAt.getName(), (String) loadStreamingConfigAt);
                }
            } catch (Exception e) {
                logger.error("Error loading streaming desc " + str, (Throwable) e);
            }
        }
        logger.debug("Loaded " + this.streamingMap.size() + " StreamingConfig(s)");
    }
}
