package org.apache.eagle.policy.siddhi.extension;

import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* loaded from: input_file:org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.class */
public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements FindableProcessor {
    private long timeToKeep;
    static final Logger log = LoggerFactory.getLogger(ExternalTimeBatchWindowProcessor.class);
    private VariableExpressionExecutor timeStampVariableExpressionExecutor;
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>();
    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<>();
    private long lastSendTime = -1;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext) {
        this.expiredEventChunk = new ComplexEventChunk<>();
        if (expressionExecutorArr.length != 2) {
            throw new ExecutionPlanValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        } else {
            this.timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + expressionExecutorArr[0].getClass());
        }
        this.timeStampVariableExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (this.timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + this.timeStampVariableExpressionExecutor.getReturnType());
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected synchronized void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        if (complexEventChunk.hasNext()) {
            if (this.currentEventChunk.getFirst() == null && this.lastSendTime < 0) {
                this.lastSendTime = ((Long) complexEventChunk.getFirst().getAttribute(this.timeStampVariableExpressionExecutor.getPosition())).longValue();
            }
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    long longValue = ((Long) next.getAttribute(this.timeStampVariableExpressionExecutor.getPosition())).longValue();
                    if (longValue < this.lastSendTime + this.timeToKeep) {
                        cloneAppend(streamEventCloner, next);
                    } else if (longValue >= this.lastSendTime + this.timeToKeep) {
                        flushCurentChunk(processor, streamEventCloner, longValue);
                        cloneAppend(streamEventCloner, next);
                    }
                }
            }
        }
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent streamEvent) {
        this.currentEventChunk.add(streamEventCloner.copyStreamEvent(streamEvent));
    }

    private void flushCurentChunk(Processor processor, StreamEventCloner streamEventCloner, long j) {
        this.currentEventChunk.reset();
        ComplexEventChunk complexEventChunk = new ComplexEventChunk();
        while (this.expiredEventChunk.hasNext()) {
            this.expiredEventChunk.next().setTimestamp(j);
        }
        if (this.expiredEventChunk.getFirst() != null) {
            complexEventChunk.add(this.expiredEventChunk.getFirst());
        }
        this.expiredEventChunk.clear();
        while (this.currentEventChunk.hasNext()) {
            StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
            copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
            this.expiredEventChunk.add(copyStreamEvent);
        }
        if (this.currentEventChunk.getFirst() != null) {
            complexEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        this.lastSendTime = j;
        if (complexEventChunk.getFirst() != null) {
            processor.process(complexEventChunk);
        }
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Object[] currentState() {
        return new Object[]{this.currentEventChunk, this.expiredEventChunk};
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Object[] objArr) {
        this.currentEventChunk = (ComplexEventChunk) objArr[0];
        this.expiredEventChunk = (ComplexEventChunk) objArr[1];
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(ComplexEvent complexEvent, Finder finder) {
        return finder.find(complexEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> list, Map<String, EventTable> map, int i, long j) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, list, map, i, this.inputDefinition, j);
    }
}
