package org.apache.shardingsphere.data.pipeline.core.metadata.node;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.class */
public final class PipelineMetaDataNodeWatcher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PipelineMetaDataNodeWatcher.class);
    private static final PipelineMetaDataNodeWatcher INSTANCE = new PipelineMetaDataNodeWatcher();
    private final Map<Pattern, PipelineMetaDataChangedEventHandler> listenerMap = new ConcurrentHashMap();

    private PipelineMetaDataNodeWatcher() {
        this.listenerMap.putAll((Map) ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class).stream().collect(Collectors.toMap((v0) -> {
            return v0.getKeyPattern();
        }, pipelineMetaDataChangedEventHandler -> {
            return pipelineMetaDataChangedEventHandler;
        }, (pipelineMetaDataChangedEventHandler2, pipelineMetaDataChangedEventHandler3) -> {
            return pipelineMetaDataChangedEventHandler3;
        })));
        PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
    }

    private void dispatchEvent(DataChangedEvent dataChangedEvent) {
        CompletableFuture.runAsync(() -> {
            dispatchEvent0(dataChangedEvent);
        }, PipelineContext.getEventListenerExecutor()).whenComplete((r4, th) -> {
            if (null != th) {
                log.error("dispatch event failed", th);
            }
        });
    }

    private void dispatchEvent0(DataChangedEvent dataChangedEvent) {
        for (Map.Entry<Pattern, PipelineMetaDataChangedEventHandler> entry : this.listenerMap.entrySet()) {
            if (entry.getKey().matcher(dataChangedEvent.getKey()).matches()) {
                entry.getValue().handle(dataChangedEvent);
                return;
            }
        }
    }

    public static PipelineMetaDataNodeWatcher getInstance() {
        return INSTANCE;
    }
}
