/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<KEY, VoidNamespace> {
    private static final long serialVersionUID = -4166778210774160757L;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private static final String NFA_STATE_NAME = "nfaStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient ValueState<NFAState> computationStates;
    private transient MapState<Long, List<IN>> elementQueueState;
    private transient SharedBuffer<IN> partialMatches;
    private transient InternalTimerService<VoidNamespace> timerService;
    private transient NFA<IN> nfa;
    private long lastWatermark;
    private final EventComparator<IN> comparator;
    protected final OutputTag<IN> lateDataOutputTag;
    protected final AfterMatchSkipStrategy afterMatchSkipStrategy;

    public AbstractKeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory, EventComparator<IN> comparator, AfterMatchSkipStrategy afterMatchSkipStrategy, F function, OutputTag<IN> lateDataOutputTag) {
        super(function);
        this.inputSerializer = (TypeSerializer)Preconditions.checkNotNull(inputSerializer);
        this.isProcessingTime = (Boolean)Preconditions.checkNotNull((Object)isProcessingTime);
        this.nfaFactory = (NFACompiler.NFAFactory)Preconditions.checkNotNull(nfaFactory);
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;
        this.afterMatchSkipStrategy = afterMatchSkipStrategy == null ? AfterMatchSkipStrategy.noSkip() : afterMatchSkipStrategy;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.computationStates = context.getKeyedStateStore().getState(new ValueStateDescriptor(NFA_STATE_NAME, (TypeSerializer)NFAStateSerializer.INSTANCE));
        this.partialMatches = new SharedBuffer<IN>(context.getKeyedStateStore(), this.inputSerializer);
        this.elementQueueState = context.getKeyedStateStore().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)new ListSerializer(this.inputSerializer)));
        this.migrateOldState();
    }

    private void migrateOldState() throws Exception {
        this.getKeyedStateBackend().applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("nfaOperatorStateName", new NFA.NFASerializer<IN>(this.inputSerializer)), new KeyedStateFunction<Object, ValueState<NFA.MigratedNFA<IN>>>(){

            public void process(Object key, ValueState<NFA.MigratedNFA<IN>> state) throws Exception {
                NFA.MigratedNFA oldState = (NFA.MigratedNFA)state.value();
                AbstractKeyedCEPPatternOperator.this.computationStates.update((Object)new NFAState(oldState.getComputationStates()));
                org.apache.flink.cep.nfa.SharedBuffer sharedBuffer = oldState.getSharedBuffer();
                AbstractKeyedCEPPatternOperator.this.partialMatches.init(sharedBuffer.getEventsBuffer(), sharedBuffer.getPages());
                state.clear();
            }
        });
    }

    public void open() throws Exception {
        super.open();
        this.timerService = this.getInternalTimerService("watermark-callbacks", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        this.nfa = this.nfaFactory.createNFA();
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (this.isProcessingTime) {
            if (this.comparator == null) {
                NFAState nfaState = this.getNFAState();
                long timestamp = this.getProcessingTimeService().getCurrentProcessingTime();
                this.advanceTime(nfaState, timestamp);
                this.processEvent(nfaState, element.getValue(), timestamp);
                this.updateNFA(nfaState);
            } else {
                long currentTime2 = this.timerService.currentProcessingTime();
                this.bufferEvent(element.getValue(), currentTime2);
                this.timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, currentTime2 + 1L);
            }
        } else {
            long timestamp = element.getTimestamp();
            Object value = element.getValue();
            if (timestamp > this.lastWatermark) {
                this.saveRegisterWatermarkTimer();
                this.bufferEvent(value, timestamp);
            } else if (this.lateDataOutputTag != null) {
                this.output.collect(this.lateDataOutputTag, element);
            }
        }
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.timerService.currentWatermark();
        if (currentWatermark + 1L > currentWatermark) {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, currentWatermark + 1L);
        }
    }

    private void bufferEvent(IN event, long currentTime2) throws Exception {
        ArrayList<Object> elementsForTimestamp = (ArrayList<Object>)this.elementQueueState.get((Object)currentTime2);
        if (elementsForTimestamp == null) {
            elementsForTimestamp = new ArrayList<Object>();
        }
        if (this.getExecutionConfig().isObjectReuseEnabled()) {
            elementsForTimestamp.add(this.inputSerializer.copy(event));
        } else {
            elementsForTimestamp.add(event);
        }
        this.elementQueueState.put((Object)currentTime2, elementsForTimestamp);
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        NFAState nfaState = this.getNFAState();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= this.timerService.currentWatermark()) {
            long timestamp = sortedTimestamps.poll();
            this.advanceTime(nfaState, timestamp);
            try (Stream<IN> elements = this.sort((Collection)this.elementQueueState.get((Object)timestamp));){
                elements.forEachOrdered(event -> {
                    try {
                        this.processEvent(nfaState, event, timestamp);
                    }
                    catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                });
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        this.advanceTime(nfaState, this.timerService.currentWatermark());
        this.updateNFA(nfaState);
        if (!sortedTimestamps.isEmpty() || !this.partialMatches.isEmpty()) {
            this.saveRegisterWatermarkTimer();
        }
        this.updateLastSeenWatermark(this.timerService.currentWatermark());
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        NFAState nfa = this.getNFAState();
        while (!sortedTimestamps.isEmpty()) {
            long timestamp = sortedTimestamps.poll();
            this.advanceTime(nfa, timestamp);
            try (Stream<IN> elements = this.sort((Collection)this.elementQueueState.get((Object)timestamp));){
                elements.forEachOrdered(event -> {
                    try {
                        this.processEvent(nfa, event, timestamp);
                    }
                    catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                });
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        this.updateNFA(nfa);
    }

    private Stream<IN> sort(Collection<IN> elements) {
        Stream<IN> stream = elements.stream();
        return this.comparator == null ? stream : stream.sorted(this.comparator);
    }

    private void updateLastSeenWatermark(long timestamp) {
        this.lastWatermark = timestamp;
    }

    private NFAState getNFAState() throws IOException {
        NFAState nfaState = (NFAState)this.computationStates.value();
        return nfaState != null ? nfaState : this.nfa.createInitialNFAState();
    }

    private void updateNFA(NFAState nfaState) throws IOException {
        if (nfaState.isStateChanged()) {
            nfaState.resetStateChanged();
            this.computationStates.update((Object)nfaState);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> sortedTimestamps = new PriorityQueue<Long>();
        for (Long timestamp : this.elementQueueState.keys()) {
            sortedTimestamps.offer(timestamp);
        }
        return sortedTimestamps;
    }

    private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = this.partialMatches.getAccessor();){
            Collection<Map<String, List<IN>>> patterns = this.nfa.process(sharedBufferAccessor, nfaState, event, timestamp, this.afterMatchSkipStrategy);
            this.processMatchedSequences(patterns, timestamp);
        }
    }

    private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = this.partialMatches.getAccessor();){
            Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut = this.nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
            this.processTimedOutSequences(timedOut, timestamp);
        }
    }

    protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> var1, long var2) throws Exception;

    protected void processTimedOutSequences(Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
    }

    @VisibleForTesting
    public boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
        this.setCurrentKey(key);
        return !this.partialMatches.isEmpty();
    }

    @VisibleForTesting
    public boolean hasNonEmptyPQ(KEY key) throws Exception {
        this.setCurrentKey(key);
        return this.elementQueueState.keys().iterator().hasNext();
    }

    @VisibleForTesting
    public int getPQSize(KEY key) throws Exception {
        this.setCurrentKey(key);
        int counter = 0;
        for (List elements : this.elementQueueState.values()) {
            counter += elements.size();
        }
        return counter;
    }
}

