package org.apache.samoa.topology;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.samoa.core.EntranceProcessor;
import org.apache.samoa.core.Processor;

/* loaded from: input_file:org/apache/samoa/topology/TopologyBuilder.class */
public class TopologyBuilder {
    private ComponentFactory componentFactory;
    private Topology topology;
    private Map<Processor, IProcessingItem> mapProcessorToProcessingItem;

    public TopologyBuilder() {
    }

    public TopologyBuilder(ComponentFactory componentFactory) {
        this.componentFactory = componentFactory;
    }

    public void initTopology(String str) {
        initTopology(str, 0);
    }

    public void initTopology(String str, int i) {
        if (this.topology != null) {
            System.out.println("Topology has been initialized before!");
        } else {
            this.topology = this.componentFactory.createTopology(str);
        }
    }

    public Topology build() {
        return this.topology;
    }

    public ProcessingItem addProcessor(Processor processor, int i) {
        ProcessingItem createPi = createPi(processor, i);
        if (this.mapProcessorToProcessingItem == null) {
            this.mapProcessorToProcessingItem = new HashMap();
        }
        this.mapProcessorToProcessingItem.put(processor, createPi);
        return createPi;
    }

    public ProcessingItem addProcessor(Processor processor) {
        return addProcessor(processor, 1);
    }

    public ProcessingItem connectInputShuffleStream(Stream stream, Processor processor) {
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        return processingItem.connectInputShuffleStream(stream);
    }

    public ProcessingItem connectInputKeyStream(Stream stream, Processor processor) {
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        return processingItem.connectInputKeyStream(stream);
    }

    public ProcessingItem connectInputAllStream(Stream stream, Processor processor) {
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        return processingItem.connectInputAllStream(stream);
    }

    public Stream createInputShuffleStream(Processor processor, Processor processor2) {
        Stream createStream = createStream(processor2);
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        processingItem.connectInputShuffleStream(createStream);
        return createStream;
    }

    public Stream createInputKeyStream(Processor processor, Processor processor2) {
        Stream createStream = createStream(processor2);
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        processingItem.connectInputKeyStream(createStream);
        return createStream;
    }

    public Stream createInputAllStream(Processor processor, Processor processor2) {
        Stream createStream = createStream(processor2);
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to connect to null PI");
        processingItem.connectInputAllStream(createStream);
        return createStream;
    }

    public Stream createStream(Processor processor) {
        IProcessingItem iProcessingItem = this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(iProcessingItem, "Trying to create stream from null PI");
        Stream createStream = createStream(iProcessingItem);
        if (iProcessingItem instanceof EntranceProcessingItem) {
            ((EntranceProcessingItem) iProcessingItem).setOutputStream(createStream);
        }
        return createStream;
    }

    public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) {
        EntranceProcessingItem createEntrancePi = createEntrancePi(entranceProcessor);
        if (this.mapProcessorToProcessingItem == null) {
            this.mapProcessorToProcessingItem = new HashMap();
        }
        this.mapProcessorToProcessingItem.put(entranceProcessor, createEntrancePi);
        return createEntrancePi;
    }

    public ProcessingItem getProcessingItem(Processor processor) {
        ProcessingItem processingItem = (ProcessingItem) this.mapProcessorToProcessingItem.get(processor);
        Preconditions.checkNotNull(processingItem, "Trying to retrieve null PI");
        return processingItem;
    }

    private ProcessingItem createPi(Processor processor) {
        return createPi(processor, 1);
    }

    private ProcessingItem createPi(Processor processor, int i) {
        ProcessingItem createPi = this.componentFactory.createPi(processor, i);
        this.topology.addProcessingItem(createPi, i);
        return createPi;
    }

    private EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
        EntranceProcessingItem createEntrancePi = this.componentFactory.createEntrancePi(entranceProcessor);
        this.topology.addEntranceProcessingItem(createEntrancePi);
        if (this.mapProcessorToProcessingItem == null) {
            this.mapProcessorToProcessingItem = new HashMap();
        }
        this.mapProcessorToProcessingItem.put(entranceProcessor, createEntrancePi);
        return createEntrancePi;
    }

    private Stream createStream(IProcessingItem iProcessingItem) {
        Stream createStream = this.componentFactory.createStream(iProcessingItem);
        this.topology.addStream(createStream);
        return createStream;
    }
}
