package co.cask.cdap.etl.common;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.plugin.PluginConfigurer;
import co.cask.cdap.api.plugin.PluginProperties;
import co.cask.cdap.etl.api.PipelineConfigurable;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.guice.TypeResolver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:lib/cdap-etl-core-3.3.5.jar:co/cask/cdap/etl/common/PipelineRegisterer.class */
public class PipelineRegisterer {
    private final PluginConfigurer configurer;
    private final String sourcePluginType;
    private final String sinkPluginType;

    /* loaded from: input_file:lib/cdap-etl-core-3.3.5.jar:co/cask/cdap/etl/common/PipelineRegisterer$PipelineConfigureDetail.class */
    private class PipelineConfigureDetail {
        PipelineConfigurable pipelineConfigurable;
        PipelineConfigurer pipelineConfigurer;

        PipelineConfigureDetail(PipelineConfigurable pipelineConfigurable, PipelineConfigurer pipelineConfigurer) {
            this.pipelineConfigurable = pipelineConfigurable;
            this.pipelineConfigurer = pipelineConfigurer;
        }

        PipelineConfigurable getPipelineConfigurable() {
            return this.pipelineConfigurable;
        }

        PipelineConfigurer getPipelineConfigurer() {
            return this.pipelineConfigurer;
        }
    }

    public PipelineRegisterer(PluginConfigurer pluginConfigurer, String str) {
        this.configurer = pluginConfigurer;
        this.sourcePluginType = str + Constants.Source.PLUGINTYPE;
        this.sinkPluginType = str + Constants.Sink.PLUGINTYPE;
    }

    public Pipeline registerPlugins(ETLConfig eTLConfig, Class cls, DatasetProperties datasetProperties, boolean z) {
        ETLConfig compatibleConfig = eTLConfig.getCompatibleConfig();
        ETLStage source = compatibleConfig.getSource();
        List<ETLStage> transforms = compatibleConfig.getTransforms();
        List<ETLStage> sinks = compatibleConfig.getSinks();
        if (sinks == null || sinks.isEmpty()) {
            throw new IllegalArgumentException("At least one sink must be specified.");
        }
        if (source == null) {
            throw new IllegalArgumentException("A source must be specified.");
        }
        validateStageNames(source, compatibleConfig.getTransforms(), compatibleConfig.getSinks());
        Map<String, List<String>> validateConnections = validateConnections(compatibleConfig);
        List<String> stagesAfterTopologicalSorting = getStagesAfterTopologicalSorting(validateConnections, source.getName());
        HashMap hashMap = new HashMap();
        String name = source.getName();
        String name2 = source.getPlugin().getName();
        PipelineConfigurable pipelineConfigurable = (PipelineConfigurable) this.configurer.usePlugin(this.sourcePluginType, name2, name, getPluginProperties(source), source.getPlugin().getPluginSelector(this.sourcePluginType, name2));
        if (pipelineConfigurable == null) {
            throw new IllegalArgumentException(String.format("No Plugin of type '%s' named '%s' was found.", Constants.Source.PLUGINTYPE, source.getPlugin().getName()));
        }
        hashMap.put(name, new PipelineConfigureDetail(pipelineConfigurable, new DefaultPipelineConfigurer(this.configurer, name)));
        ArrayList arrayList = new ArrayList(transforms.size());
        ArrayList arrayList2 = new ArrayList(transforms.size());
        for (ETLStage eTLStage : transforms) {
            String name3 = eTLStage.getName();
            PluginProperties pluginProperties = getPluginProperties(eTLStage);
            String name4 = eTLStage.getPlugin().getName();
            Transform transform = (Transform) this.configurer.usePlugin("transform", name4, name3, pluginProperties, eTLStage.getPlugin().getPluginSelector("transform", name4));
            if (transform == null) {
                throw new IllegalArgumentException(String.format("No Plugin of type '%s' named '%s' was found", "transform", eTLStage.getPlugin().getName()));
            }
            if (eTLStage.getErrorDatasetName() != null) {
                this.configurer.createDataset(eTLStage.getErrorDatasetName(), cls, datasetProperties);
            }
            hashMap.put(name3, new PipelineConfigureDetail(transform, new DefaultPipelineConfigurer(this.configurer, name3)));
            arrayList.add(new TransformInfo(name3, eTLStage.getErrorDatasetName()));
            arrayList2.add(transform);
        }
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        for (ETLStage eTLStage2 : sinks) {
            String name5 = eTLStage2.getName();
            if (z && eTLStage2.getErrorDatasetName() != null) {
                this.configurer.createDataset(eTLStage2.getErrorDatasetName(), cls, datasetProperties);
            }
            arrayList3.add(new SinkInfo(name5, eTLStage2.getErrorDatasetName()));
            String name6 = eTLStage2.getPlugin().getName();
            PipelineConfigurable pipelineConfigurable2 = (PipelineConfigurable) this.configurer.usePlugin(this.sinkPluginType, name6, name5, getPluginProperties(eTLStage2), eTLStage2.getPlugin().getPluginSelector(this.sinkPluginType, name6));
            if (pipelineConfigurable2 == null) {
                throw new IllegalArgumentException(String.format("No Plugin of type '%s' named '%s' was foundPlease check that an artifact containing the plugin exists, and that it extends the etl application.", Constants.Sink.PLUGINTYPE, eTLStage2.getPlugin().getName()));
            }
            hashMap.put(name5, new PipelineConfigureDetail(pipelineConfigurable2, new DefaultPipelineConfigurer(this.configurer, name5)));
            arrayList4.add(pipelineConfigurable2);
        }
        for (String str : stagesAfterTopologicalSorting) {
            PipelineConfigureDetail pipelineConfigureDetail = (PipelineConfigureDetail) hashMap.get(str);
            try {
                pipelineConfigureDetail.getPipelineConfigurable().configurePipeline(pipelineConfigureDetail.getPipelineConfigurer());
                Schema outputSchema = ((DefaultStageConfigurer) pipelineConfigureDetail.getPipelineConfigurer().getStageConfigurer()).getOutputSchema();
                if (validateConnections.containsKey(str)) {
                    Iterator<String> it = validateConnections.get(str).iterator();
                    while (it.hasNext()) {
                        ((DefaultStageConfigurer) ((PipelineConfigureDetail) hashMap.get(it.next())).getPipelineConfigurer().getStageConfigurer()).setInputSchema(outputSchema);
                    }
                }
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException(String.format("Exception in stage %s : %s", str, e.getMessage()), e.getCause());
            }
        }
        return new Pipeline(name, arrayList3, arrayList, validateConnections);
    }

    @VisibleForTesting
    static List<String> getStagesAfterTopologicalSorting(Map<String, List<String>> map, String str) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            for (String str2 : entry.getValue()) {
                if (!hashMap.containsKey(str2)) {
                    hashMap.put(str2, new ArrayList());
                }
                ((List) hashMap.get(str2)).add(entry.getKey());
            }
        }
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        hashSet.add(str);
        while (!hashSet.isEmpty()) {
            String str3 = (String) hashSet.iterator().next();
            hashSet.remove(str3);
            arrayList.add(str3);
            if (map.containsKey(str3)) {
                for (String str4 : map.get(str3)) {
                    ((List) hashMap.get(str4)).remove(str3);
                    if (((List) hashMap.get(str4)).isEmpty()) {
                        hashSet.add(str4);
                    }
                }
            }
        }
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            if (!((List) it.next()).isEmpty()) {
                throw new IllegalArgumentException("Cycle exists in the graph.");
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    static Map<String, List<String>> validateConnections(ETLConfig eTLConfig) {
        HashMap hashMap = new HashMap();
        if (eTLConfig.getConnections().size() < eTLConfig.getTransforms().size() + eTLConfig.getSinks().size()) {
            throw new IllegalArgumentException("Number of edges connecting the pipeline is less than the number of vertices, please check the connections");
        }
        HashSet hashSet = new HashSet();
        hashSet.add(eTLConfig.getSource().getName());
        Iterator<ETLStage> it = eTLConfig.getTransforms().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getName());
        }
        Iterator<ETLStage> it2 = eTLConfig.getSinks().iterator();
        while (it2.hasNext()) {
            hashSet.add(it2.next().getName());
        }
        for (Connection connection : eTLConfig.getConnections()) {
            Preconditions.checkArgument(hashSet.contains(connection.getFrom()), String.format(" The from name %s in connection %s : %s does not belong to an actual stage name, please check the config", connection.getFrom(), connection.getFrom(), connection.getTo()));
            Preconditions.checkArgument(hashSet.contains(connection.getTo()), String.format("The to name : %s in connection %s : %s does not belong to an actual stage name, please check the config", connection.getTo(), connection.getFrom(), connection.getTo()));
            if (hashMap.containsKey(connection.getFrom())) {
                ((List) hashMap.get(connection.getFrom())).add(connection.getTo());
            } else {
                ArrayList arrayList = new ArrayList();
                arrayList.add(connection.getTo());
                hashMap.put(connection.getFrom(), arrayList);
            }
        }
        HashSet hashSet2 = new HashSet();
        hashSet2.add(eTLConfig.getSource().getName());
        HashSet hashSet3 = new HashSet();
        Iterator<ETLStage> it3 = eTLConfig.getSinks().iterator();
        while (it3.hasNext()) {
            hashSet3.add(it3.next().getName());
        }
        HashSet hashSet4 = new HashSet();
        connectionsReachabilityValidation(hashMap, eTLConfig, eTLConfig.getSource().getName(), hashSet2, hashSet4, hashSet3);
        for (ETLStage eTLStage : eTLConfig.getSinks()) {
            if (!hashSet4.contains(eTLStage.getName())) {
                throw new IllegalArgumentException(String.format("Sink %s is not connected, please check the connections", eTLStage.getName()));
            }
        }
        return hashMap;
    }

    private static void connectionsReachabilityValidation(Map<String, List<String>> map, ETLConfig eTLConfig, String str, Set<String> set, Set<String> set2, Set<String> set3) {
        if (map.get(str) == null) {
            if (!set3.contains(str)) {
                throw new IllegalArgumentException(String.format("Stage : %s is not connected to any transform or sink, please check the config", str));
            }
            set2.add(str);
            return;
        }
        for (String str2 : map.get(str)) {
            if (set.contains(str2)) {
                throw new IllegalArgumentException(String.format("Connection %s --> %s causes a cycle, Graph has to be a DAG, please check the graph connections", str, str2));
            }
            HashSet hashSet = new HashSet(set);
            hashSet.add(str2);
            connectionsReachabilityValidation(map, eTLConfig, str2, hashSet, set2, set3);
        }
    }

    @VisibleForTesting
    static void validateStageNames(ETLStage eTLStage, List<ETLStage> list, List<ETLStage> list2) {
        HashSet hashSet = new HashSet();
        hashSet.add(eTLStage.getName());
        validateUniqueStageName(hashSet, list);
        validateUniqueStageName(hashSet, list2);
    }

    private static void validateUniqueStageName(Set<String> set, List<ETLStage> list) {
        for (ETLStage eTLStage : list) {
            if (!set.add(eTLStage.getName())) {
                throw new IllegalArgumentException(String.format("Stage name : %s is not unique, its used for more than one stage in the pipeline, check the pipeline config", eTLStage.getName()));
            }
        }
    }

    private PluginProperties getPluginProperties(ETLStage eTLStage) {
        PluginProperties.Builder builder = PluginProperties.builder();
        if (eTLStage.getPlugin().getProperties() != null) {
            builder.addAll(eTLStage.getPlugin().getProperties());
        }
        return builder.build();
    }

    @VisibleForTesting
    static void validateTypes(List<Type> list) {
        Preconditions.checkArgument(list.size() % 2 == 0, "ETL Stages validation expects even number of types");
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        newArrayListWithCapacity.add(list.get(0));
        try {
            newArrayListWithCapacity.add(new TypeResolver().where(list.get(1), (Type) newArrayListWithCapacity.get(0)).resolveType(list.get(1)));
        } catch (IllegalArgumentException e) {
            newArrayListWithCapacity.add(list.get(1));
        }
        for (int i = 2; i < list.size(); i++) {
            Type type = (Type) newArrayListWithCapacity.get(i - 1);
            Type type2 = list.get(i - 1);
            Type type3 = list.get(i);
            try {
                newArrayListWithCapacity.add(((type3 instanceof TypeVariable) || (type3 instanceof GenericArrayType)) ? new TypeResolver().where(type3, type).resolveType(type3) : new TypeResolver().where(type2, type).resolveType(type3));
            } catch (IllegalArgumentException e2) {
                newArrayListWithCapacity.add(type3);
            }
        }
        for (int i2 = 0; i2 < newArrayListWithCapacity.size(); i2 += 2) {
            Type type4 = (Type) newArrayListWithCapacity.get(i2);
            Type type5 = (Type) newArrayListWithCapacity.get(i2 + 1);
            Preconditions.checkArgument(TypeToken.of(type5).isAssignableFrom(type4), "Types between stages didn't match. Mismatch between {} -> {}", new Object[]{type4, type5});
        }
    }
}
