package org.apache.eventmesh.runtime.boot;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.transformer.Transformer;
import org.apache.eventmesh.transformer.TransformerBuilder;
import org.apache.eventmesh.transformer.TransformerParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/TransformerEngine.class */
public class TransformerEngine {
    private static final Logger log = LoggerFactory.getLogger(TransformerEngine.class);
    private final MetaStorage metaStorage;
    private MetaServiceListener metaServiceListener;
    private final ProducerManager producerManager;
    private final ConsumerManager consumerManager;
    private final Map<String, Transformer> transformerMap = new HashMap();
    private final String transformerPrefix = "transformer-";
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public TransformerEngine(MetaStorage metaStorage, ProducerManager producerManager, ConsumerManager consumerManager) {
        this.metaStorage = metaStorage;
        this.producerManager = producerManager;
        this.consumerManager = consumerManager;
    }

    public void start() {
        for (Map.Entry<String, String> entry : this.metaStorage.getMetaData("transformer-", true).entrySet()) {
            updateTransformerMap(entry.getKey(), entry.getValue());
        }
        this.metaServiceListener = this::updateTransformerMap;
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            Iterator it = this.producerManager.getProducerTable().keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                Iterator<String> it2 = this.transformerMap.keySet().iterator();
                while (it2.hasNext()) {
                    if (!StringUtils.contains(it2.next(), str)) {
                        addTransformerListener(str);
                        LogUtils.info(log, "addTransformerListener for producer group: " + str);
                    }
                }
            }
            Iterator it3 = this.consumerManager.getClientTable().keySet().iterator();
            while (it3.hasNext()) {
                String str2 = (String) it3.next();
                Iterator<String> it4 = this.transformerMap.keySet().iterator();
                while (it4.hasNext()) {
                    if (!StringUtils.contains(it4.next(), str2)) {
                        addTransformerListener(str2);
                        LogUtils.info(log, "addTransformerListener for consumer group: " + str2);
                    }
                }
            }
        }, 10000L, 5000L, TimeUnit.MILLISECONDS);
    }

    private void updateTransformerMap(String str, String str2) {
        String substringAfter = StringUtils.substringAfter(str, "transformer-");
        JsonNode jsonNode = JsonUtils.getJsonNode(str2);
        if (jsonNode != null) {
            Iterator it = jsonNode.iterator();
            while (it.hasNext()) {
                JsonNode jsonNode2 = (JsonNode) it.next();
                this.transformerMap.put(substringAfter + "-" + jsonNode2.get(EventMeshConstants.MANAGE_TOPIC).asText(), TransformerBuilder.buildTransformer((TransformerParam) JsonUtils.parseObject(jsonNode2.get("transformerParam").toString(), TransformerParam.class)));
            }
        }
        addTransformerListener(substringAfter);
    }

    public void addTransformerListener(String str) {
        try {
            this.metaStorage.getMetaDataWithListener(this.metaServiceListener, "transformer-" + str);
        } catch (Exception e) {
            throw new RuntimeException("addTransformerListener exception", e);
        }
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
    }

    public Transformer getTransformer(String str) {
        return this.transformerMap.get(str);
    }
}
