package org.apache.iotdb.commons.pipe.agent.plugin;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskTemporaryRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.class */
public abstract class PipePluginAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginAgent.class);
    protected final PipePluginMetaKeeper pipePluginMetaKeeper;
    private final PipeExtractorConstructor pipeExtractorConstructor;
    private final PipeProcessorConstructor pipeProcessorConstructor;
    private final PipeConnectorConstructor pipeConnectorConstructor;

    protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
        this.pipePluginMetaKeeper = pipePluginMetaKeeper;
        this.pipeExtractorConstructor = createPipeExtractorConstructor(pipePluginMetaKeeper);
        this.pipeProcessorConstructor = createPipeProcessorConstructor(pipePluginMetaKeeper);
        this.pipeConnectorConstructor = createPipeConnectorConstructor(pipePluginMetaKeeper);
    }

    protected abstract PipeExtractorConstructor createPipeExtractorConstructor(PipePluginMetaKeeper pipePluginMetaKeeper);

    protected abstract PipeProcessorConstructor createPipeProcessorConstructor(PipePluginMetaKeeper pipePluginMetaKeeper);

    protected abstract PipeConnectorConstructor createPipeConnectorConstructor(PipePluginMetaKeeper pipePluginMetaKeeper);

    public final PipeExtractor reflectExtractor(PipeParameters pipeParameters) {
        return this.pipeExtractorConstructor.mo66reflectPlugin(pipeParameters);
    }

    public final PipeProcessor reflectProcessor(PipeParameters pipeParameters) {
        return this.pipeProcessorConstructor.mo66reflectPlugin(pipeParameters);
    }

    public final PipeConnector reflectConnector(PipeParameters pipeParameters) {
        return this.pipeConnectorConstructor.mo66reflectPlugin(pipeParameters);
    }

    public void validate(String str, Map<String, String> map, Map<String, String> map2, Map<String, String> map3) throws Exception {
        validateExtractor(map);
        validateProcessor(map2);
        validateConnector(str, map3);
    }

    public void validateExtractor(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeExtractor reflectExtractor = reflectExtractor(pipeParameters);
        try {
            reflectExtractor.validate(new PipeParameterValidator(pipeParameters));
        } finally {
            try {
                reflectExtractor.close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(), e);
            }
        }
    }

    public void validateProcessor(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeProcessor reflectProcessor = reflectProcessor(pipeParameters);
        try {
            reflectProcessor.validate(new PipeParameterValidator(pipeParameters));
        } finally {
            try {
                reflectProcessor.close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(), e);
            }
        }
    }

    public void validateConnector(String str, Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeConnector reflectConnector = reflectConnector(pipeParameters);
        try {
            reflectConnector.validate(new PipeParameterValidator(pipeParameters));
            reflectConnector.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskTemporaryRuntimeEnvironment(str)));
            reflectConnector.handshake();
        } finally {
            try {
                reflectConnector.close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), e);
            }
        }
    }

    public final List<String> getSubProcessorNamesWithSpecifiedParent(Class<? extends PipeProcessor> cls) throws PipeException {
        return (List) Arrays.stream(this.pipePluginMetaKeeper.getAllPipePluginMeta()).map(pipePluginMeta -> {
            return pipePluginMeta.getPluginName().toLowerCase();
        }).filter(str -> {
            try {
                PipeProcessor reflectPluginByKey = this.pipeProcessorConstructor.reflectPluginByKey(str);
                try {
                    boolean z = reflectPluginByKey.getClass().getSuperclass() == cls;
                    if (reflectPluginByKey != null) {
                        reflectPluginByKey.close();
                    }
                    return z;
                } catch (Throwable th) {
                    if (reflectPluginByKey != null) {
                        try {
                            reflectPluginByKey.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                return false;
            }
        }).collect(Collectors.toList());
    }

    public final PipeProcessor getConfiguredProcessor(String str, PipeParameters pipeParameters, PipeProcessorRuntimeConfiguration pipeProcessorRuntimeConfiguration) {
        HashMap hashMap = new HashMap();
        if (Objects.nonNull(str)) {
            hashMap.put(PipeProcessorConstant.PROCESSOR_KEY, str);
        }
        PipeParameters addOrReplaceEquivalentAttributesWithClone = pipeParameters.addOrReplaceEquivalentAttributesWithClone(new PipeParameters(hashMap));
        PipeProcessor reflectProcessor = reflectProcessor(addOrReplaceEquivalentAttributesWithClone);
        try {
            reflectProcessor.validate(new PipeParameterValidator(addOrReplaceEquivalentAttributesWithClone));
            reflectProcessor.customize(addOrReplaceEquivalentAttributesWithClone, pipeProcessorRuntimeConfiguration);
            return reflectProcessor;
        } catch (Exception e) {
            try {
                reflectProcessor.close();
            } catch (Exception e2) {
                LOGGER.warn("Failed to close processor after failed to initialize processor. Ignore this exception.", e2);
            }
            throw new PipeException(e.getMessage(), e);
        }
    }
}
