package org.apache.samoa.topology.impl;

import java.util.ArrayList;
import java.util.List;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.Processor;
import org.apache.samoa.topology.AbstractProcessingItem;
import org.apache.samoa.topology.ProcessingItem;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.utils.PartitioningScheme;
import org.apache.samoa.utils.StreamDestination;

/* loaded from: input_file:org/apache/samoa/topology/impl/ThreadsProcessingItem.class */
public class ThreadsProcessingItem extends AbstractProcessingItem {
    private List<ThreadsProcessingItemInstance> piInstances;
    private int offset;

    public ThreadsProcessingItem(Processor processor, int i) {
        super(processor, i);
        this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads());
    }

    public List<ThreadsProcessingItemInstance> getProcessingItemInstances() {
        return this.piInstances;
    }

    protected ProcessingItem addInputStream(Stream stream, PartitioningScheme partitioningScheme) {
        ((ThreadsStream) stream).addDestination(new StreamDestination(this, getParallelism(), partitioningScheme));
        return this;
    }

    public void processEvent(ContentEvent contentEvent, int i) {
        if (this.piInstances == null || this.piInstances.size() < getParallelism()) {
            throw new IllegalStateException("ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start()).");
        }
        ThreadsProcessingItemInstance threadsProcessingItemInstance = this.piInstances.get(i);
        ThreadsEngine.getThreadWithIndex(threadsProcessingItemInstance.getThreadIndex()).submit(new ThreadsEventRunnable(threadsProcessingItemInstance, contentEvent));
    }

    public void setupInstances() {
        this.piInstances = new ArrayList(getParallelism());
        for (int i = 0; i < getParallelism(); i++) {
            Processor newProcessor = getProcessor().newProcessor(getProcessor());
            newProcessor.onCreate(i + 1);
            this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i));
        }
    }
}
