package org.apache.samoa.streams;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.EntranceProcessor;
import org.apache.samoa.core.Processor;
import org.apache.samoa.instances.Instance;
import org.apache.samoa.instances.Instances;
import org.apache.samoa.learners.InstanceContentEvent;
import org.apache.samoa.moa.options.AbstractOptionHandler;
import org.apache.samoa.moa.streams.InstanceStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samoa/streams/PrequentialSourceProcessor.class */
public final class PrequentialSourceProcessor implements EntranceProcessor {
    private static final long serialVersionUID = 4169053337917578558L;
    private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class);
    private StreamSource streamSource;
    private Instance firstInstance;
    private int numberInstances;
    protected InstanceStream sourceStream;
    private transient ScheduledExecutorService timer;
    private boolean isInited = false;
    private int numInstanceSent = 0;
    private transient ScheduledFuture<?> schedule = null;
    private int readyEventIndex = 1;
    private int delay = 0;
    private int batchSize = 1;
    private boolean finished = false;

    /* loaded from: input_file:org/apache/samoa/streams/PrequentialSourceProcessor$DelayTimeoutHandler.class */
    private class DelayTimeoutHandler implements Runnable {
        private PrequentialSourceProcessor processor;

        public DelayTimeoutHandler(PrequentialSourceProcessor prequentialSourceProcessor) {
            this.processor = prequentialSourceProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.processor.increaseReadyEventIndex();
        }
    }

    @Override // org.apache.samoa.core.Processor
    public boolean process(ContentEvent contentEvent) {
        return false;
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean isFinished() {
        return this.finished;
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean hasNext() {
        return !isFinished() && (this.delay <= 0 || this.numInstanceSent < this.readyEventIndex);
    }

    private boolean hasReachedEndOfStream() {
        return !this.streamSource.hasMoreInstances() || (this.numberInstances >= 0 && this.numInstanceSent >= this.numberInstances);
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public ContentEvent nextEvent() {
        InstanceContentEvent instanceContentEvent = null;
        if (hasReachedEndOfStream()) {
            instanceContentEvent = new InstanceContentEvent(-1L, this.firstInstance, false, true);
            instanceContentEvent.setLast(true);
            this.finished = true;
        } else if (hasNext()) {
            this.numInstanceSent++;
            instanceContentEvent = new InstanceContentEvent(this.numInstanceSent, nextInstance(), true, true);
            if (this.schedule == null && this.delay > 0) {
                this.schedule = this.timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), this.delay, this.delay, TimeUnit.MICROSECONDS);
            }
        }
        return instanceContentEvent;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void increaseReadyEventIndex() {
        this.readyEventIndex += this.batchSize;
        if (this.schedule == null || !isFinished()) {
            return;
        }
        this.schedule.cancel(false);
    }

    @Override // org.apache.samoa.core.EntranceProcessor, org.apache.samoa.core.Processor
    public void onCreate(int i) {
        initStreamSource(this.sourceStream);
        this.timer = Executors.newScheduledThreadPool(1);
        logger.debug("Creating PrequentialSourceProcessor with id {}", Integer.valueOf(i));
    }

    @Override // org.apache.samoa.core.Processor
    public Processor newProcessor(Processor processor) {
        PrequentialSourceProcessor prequentialSourceProcessor = new PrequentialSourceProcessor();
        PrequentialSourceProcessor prequentialSourceProcessor2 = (PrequentialSourceProcessor) processor;
        if (prequentialSourceProcessor2.getStreamSource() != null) {
            prequentialSourceProcessor.setStreamSource(prequentialSourceProcessor2.getStreamSource().getStream());
        }
        return prequentialSourceProcessor;
    }

    public StreamSource getStreamSource() {
        return this.streamSource;
    }

    public void setStreamSource(InstanceStream instanceStream) {
        this.sourceStream = instanceStream;
    }

    public Instances getDataset() {
        if (this.firstInstance == null) {
            initStreamSource(this.sourceStream);
        }
        return this.firstInstance.dataset();
    }

    private Instance nextInstance() {
        if (this.isInited) {
            return this.streamSource.nextInstance().getData();
        }
        this.isInited = true;
        return this.firstInstance;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void initStreamSource(InstanceStream instanceStream) {
        if (instanceStream instanceof AbstractOptionHandler) {
            ((AbstractOptionHandler) instanceStream).prepareForUse();
        }
        this.streamSource = new StreamSource(instanceStream);
        this.firstInstance = this.streamSource.nextInstance().getData();
    }

    public void setMaxNumInstances(int i) {
        this.numberInstances = i;
    }

    public int getMaxNumInstances() {
        return this.numberInstances;
    }

    public void setSourceDelay(int i) {
        this.delay = i;
    }

    public int getSourceDelay() {
        return this.delay;
    }

    public void setDelayBatchSize(int i) {
        this.batchSize = i;
    }
}
