package co.cask.cdap.etl.spec;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.plugin.PluginConfigurer;
import co.cask.cdap.etl.api.PipelineConfigurable;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.common.DefaultPipelineConfigurer;
import co.cask.cdap.etl.planner.Dag;
import co.cask.cdap.etl.proto.Connection;
import co.cask.cdap.etl.proto.v2.ETLConfig;
import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.etl.proto.v2.ETLStage;
import co.cask.cdap.etl.spec.PipelineSpec;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/* loaded from: input_file:lib/cdap-etl-core-3.4.3.jar:co/cask/cdap/etl/spec/PipelineSpecGenerator.class */
public abstract class PipelineSpecGenerator<C extends ETLConfig, P extends PipelineSpec> {
    protected final PluginConfigurer configurer;
    private final Class<? extends Dataset> errorDatasetClass;
    private final DatasetProperties errorDatasetProperties;
    private final Set<String> sourcePluginTypes;
    private final Set<String> sinkPluginTypes;

    /* JADX INFO: Access modifiers changed from: protected */
    public PipelineSpecGenerator(PluginConfigurer pluginConfigurer, Set<String> set, Set<String> set2, Class<? extends Dataset> cls, DatasetProperties datasetProperties) {
        this.configurer = pluginConfigurer;
        this.sourcePluginTypes = set;
        this.sinkPluginTypes = set2;
        this.errorDatasetClass = cls;
        this.errorDatasetProperties = datasetProperties;
    }

    public abstract P generateSpec(C c);

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureStages(ETLConfig eTLConfig, PipelineSpec.Builder builder) {
        List<StageConnections> validateConfig = validateConfig(eTLConfig);
        HashMap hashMap = new HashMap(validateConfig.size());
        Iterator<StageConnections> it = validateConfig.iterator();
        while (it.hasNext()) {
            String name = it.next().getStage().getName();
            hashMap.put(name, new DefaultPipelineConfigurer(this.configurer, name));
        }
        for (StageConnections stageConnections : validateConfig) {
            StageSpec configureStage = configureStage(stageConnections, (DefaultPipelineConfigurer) hashMap.get(stageConnections.getStage().getName()));
            Schema outputSchema = configureStage.getOutputSchema();
            Iterator<String> it2 = stageConnections.getOutputs().iterator();
            while (it2.hasNext()) {
                ((DefaultPipelineConfigurer) hashMap.get(it2.next())).getStageConfigurer().setInputSchema(outputSchema);
            }
            builder.addStage(configureStage);
        }
        builder.addConnections(eTLConfig.getConnections()).setResources(eTLConfig.getResources()).setStageLoggingEnabled(eTLConfig.isStageLoggingEnabled());
    }

    private StageSpec configureStage(StageConnections stageConnections, DefaultPipelineConfigurer defaultPipelineConfigurer) {
        ETLStage stage = stageConnections.getStage();
        String name = stage.getName();
        ETLPlugin plugin = stage.getPlugin();
        if (!Strings.isNullOrEmpty(stage.getErrorDatasetName())) {
            this.configurer.createDataset(stage.getErrorDatasetName(), this.errorDatasetClass, this.errorDatasetProperties);
        }
        PluginSpec configurePlugin = configurePlugin(name, plugin, defaultPipelineConfigurer);
        Schema inputSchema = defaultPipelineConfigurer.getStageConfigurer().getInputSchema();
        return StageSpec.builder(name, configurePlugin).setErrorDatasetName(stage.getErrorDatasetName()).setInputSchema(inputSchema).setOutputSchema(defaultPipelineConfigurer.getStageConfigurer().getOutputSchema()).addInputs(stageConnections.getInputs()).addOutputs(stageConnections.getOutputs()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PluginSpec configurePlugin(String str, ETLPlugin eTLPlugin, PipelineConfigurer pipelineConfigurer) {
        TrackedPluginSelector trackedPluginSelector = new TrackedPluginSelector(eTLPlugin.getPluginSelector());
        PipelineConfigurable pipelineConfigurable = (PipelineConfigurable) this.configurer.usePlugin(eTLPlugin.getType(), eTLPlugin.getName(), str, eTLPlugin.getPluginProperties(), trackedPluginSelector);
        if (pipelineConfigurable == null) {
            throw new IllegalArgumentException(String.format("No plugin of type %s and name %s could be found stage for %s.", eTLPlugin.getType(), eTLPlugin.getName(), str));
        }
        try {
            pipelineConfigurable.configurePipeline(pipelineConfigurer);
            return new PluginSpec(eTLPlugin.getType(), eTLPlugin.getName(), eTLPlugin.getProperties(), trackedPluginSelector.getSelectedArtifact());
        } catch (Exception e) {
            throw new RuntimeException(String.format("Exception while configuring plugin of type %s and name %s for stage %s: %s", eTLPlugin.getType(), eTLPlugin.getName(), str, e.getMessage()), e);
        }
    }

    private List<StageConnections> validateConfig(ETLConfig eTLConfig) {
        eTLConfig.validate();
        if (eTLConfig.getStages().isEmpty()) {
            throw new IllegalArgumentException("A pipeline must contain at least one stage.");
        }
        HashSet hashSet = new HashSet();
        for (ETLStage eTLStage : eTLConfig.getStages()) {
            if (!hashSet.add(eTLStage.getName())) {
                throw new IllegalArgumentException(String.format("Invalid pipeline. Multiple stages are named %s. Please ensure all stage names are unique", eTLStage.getName()));
            }
        }
        for (Connection connection : eTLConfig.getConnections()) {
            if (!hashSet.contains(connection.getFrom())) {
                throw new IllegalArgumentException(String.format("Invalid connection %s. %s is not a stage.", connection, connection.getFrom()));
            }
            if (!hashSet.contains(connection.getTo())) {
                throw new IllegalArgumentException(String.format("Invalid connection %s. %s is not a stage.", connection, connection.getTo()));
            }
        }
        Dag dag = new Dag(eTLConfig.getConnections());
        HashMap hashMap = new HashMap();
        for (ETLStage eTLStage2 : eTLConfig.getStages()) {
            String name = eTLStage2.getName();
            Set<String> nodeInputs = dag.getNodeInputs(name);
            Set<String> nodeOutputs = dag.getNodeOutputs(name);
            if (isSource(eTLStage2.getPlugin().getType())) {
                if (!nodeInputs.isEmpty()) {
                    throw new IllegalArgumentException(String.format("Source %s has incoming connections from %s. Sources cannot have any incoming connections.", name, Joiner.on(',').join(nodeInputs)));
                }
            } else if (isSink(eTLStage2.getPlugin().getType())) {
                if (!nodeOutputs.isEmpty()) {
                    throw new IllegalArgumentException(String.format("Sink %s has outgoing connections to %s. Sinks cannot have any outgoing connections.", name, Joiner.on(',').join(nodeOutputs)));
                }
            } else {
                if (nodeInputs.isEmpty()) {
                    throw new IllegalArgumentException(String.format("Stage %s is unreachable, it has no incoming connections.", name));
                }
                if (nodeOutputs.isEmpty()) {
                    throw new IllegalArgumentException(String.format("Stage %s is a dead end, it has no outgoing connections.", name));
                }
            }
            hashMap.put(name, new StageConnections(eTLStage2, nodeInputs, nodeOutputs));
        }
        ArrayList arrayList = new ArrayList(hashMap.size());
        Iterator<String> it = dag.getTopologicalOrder().iterator();
        while (it.hasNext()) {
            arrayList.add(hashMap.get(it.next()));
        }
        return arrayList;
    }

    private boolean isSource(String str) {
        return this.sourcePluginTypes.contains(str);
    }

    private boolean isSink(String str) {
        return this.sinkPluginTypes.contains(str);
    }
}
