package org.apache.flink.streaming.api.invokable.operator;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowInvokable.class */
public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
    private static final long serialVersionUID = -8038984294071650730L;
    private LinkedList<TriggerPolicy<IN>> triggerPolicies;
    private LinkedList<EvictionPolicy<IN>> evictionPolicies;
    private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
    private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
    private LinkedList<Thread> activePolicyTreads;
    protected LinkedList<IN> buffer;
    private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowInvokable$WindowingCallback.class */
    private class WindowingCallback implements ActiveTriggerCallback {
        private ActiveTriggerPolicy<IN> policy;

        public WindowingCallback(ActiveTriggerPolicy<IN> activeTriggerPolicy) {
            this.policy = activeTriggerPolicy;
        }

        @Override // org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback
        public void sendFakeElement(Object obj) {
            WindowInvokable.this.processFakeElement(obj, this.policy);
        }
    }

    public WindowInvokable(Function function, LinkedList<TriggerPolicy<IN>> linkedList, LinkedList<EvictionPolicy<IN>> linkedList2) {
        super(function);
        this.triggerPolicies = linkedList;
        this.evictionPolicies = linkedList2;
        this.activeTriggerPolicies = new LinkedList<>();
        Iterator<TriggerPolicy<IN>> it = linkedList.iterator();
        while (it.hasNext()) {
            TriggerPolicy<IN> next = it.next();
            if (next instanceof ActiveTriggerPolicy) {
                this.activeTriggerPolicies.add((ActiveTriggerPolicy) next);
            }
        }
        this.activeEvictionPolicies = new LinkedList<>();
        Iterator<EvictionPolicy<IN>> it2 = linkedList2.iterator();
        while (it2.hasNext()) {
            EvictionPolicy<IN> next2 = it2.next();
            if (next2 instanceof ActiveEvictionPolicy) {
                this.activeEvictionPolicies.add((ActiveEvictionPolicy) next2);
            }
        }
        this.activePolicyTreads = new LinkedList<>();
        this.buffer = new LinkedList<>();
        this.currentTriggerPolicies = new LinkedList<>();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        Iterator<ActiveTriggerPolicy<IN>> it = this.activeTriggerPolicies.iterator();
        while (it.hasNext()) {
            ActiveTriggerPolicy<IN> next = it.next();
            Runnable createActiveTriggerRunnable = next.createActiveTriggerRunnable(new WindowingCallback(next));
            if (createActiveTriggerRunnable != null) {
                Thread thread = new Thread(createActiveTriggerRunnable);
                this.activePolicyTreads.add(thread);
                thread.start();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    public void invoke() throws Exception {
        if (readNext() == null) {
            throw new RuntimeException("DataStream must not be empty");
        }
        while (this.nextRecord != null) {
            processRealElement(this.nextRecord.getObject());
            readNext();
        }
        Iterator<Thread> it = this.activePolicyTreads.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
        emitFinalWindow(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void externalTriggerFakeElement(IN in, List<TriggerPolicy<IN>> list) {
        this.currentTriggerPolicies.addAll(list);
        callUserFunctionAndLogException();
        this.currentTriggerPolicies.clear();
        int i = 0;
        Iterator<ActiveEvictionPolicy<IN>> it = this.activeEvictionPolicies.iterator();
        while (it.hasNext()) {
            int notifyEvictionWithFakeElement = it.next().notifyEvictionWithFakeElement(in, this.buffer.size());
            if (notifyEvictionWithFakeElement > i) {
                i = notifyEvictionWithFakeElement;
            }
        }
        for (int i2 = 0; i2 < i; i2++) {
            try {
                this.buffer.removeFirst();
            } catch (NoSuchElementException e) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void processFakeElement(Object obj, TriggerPolicy<IN> triggerPolicy) {
        int i = 0;
        Iterator<ActiveEvictionPolicy<IN>> it = this.activeEvictionPolicies.iterator();
        while (it.hasNext()) {
            int notifyEvictionWithFakeElement = it.next().notifyEvictionWithFakeElement(obj, this.buffer.size());
            if (notifyEvictionWithFakeElement > i) {
                i = notifyEvictionWithFakeElement;
            }
        }
        for (int i2 = 0; i2 < i; i2++) {
            try {
                this.buffer.removeFirst();
            } catch (NoSuchElementException e) {
            }
        }
        this.currentTriggerPolicies.add(triggerPolicy);
        callUserFunctionAndLogException();
        this.currentTriggerPolicies.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void processRealElement(IN in, List<TriggerPolicy<IN>> list) {
        this.currentTriggerPolicies.addAll(list);
        processRealElement(in);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void processRealElement(IN in) {
        Iterator<ActiveTriggerPolicy<IN>> it = this.activeTriggerPolicies.iterator();
        while (it.hasNext()) {
            ActiveTriggerPolicy<IN> next = it.next();
            for (Object obj : next.preNotifyTrigger(in)) {
                processFakeElement(obj, next);
            }
        }
        boolean z = false;
        Iterator<TriggerPolicy<IN>> it2 = this.triggerPolicies.iterator();
        while (it2.hasNext()) {
            TriggerPolicy<IN> next2 = it2.next();
            if (next2.notifyTrigger(in)) {
                this.currentTriggerPolicies.add(next2);
            }
        }
        if (!this.currentTriggerPolicies.isEmpty()) {
            callUserFunctionAndLogException();
            this.currentTriggerPolicies.clear();
            z = true;
        }
        int i = 0;
        Iterator<EvictionPolicy<IN>> it3 = this.evictionPolicies.iterator();
        while (it3.hasNext()) {
            int notifyEviction = it3.next().notifyEviction(in, z, this.buffer.size());
            if (notifyEviction > i) {
                i = notifyEviction;
            }
        }
        for (int i2 = 0; i2 < i; i2++) {
            try {
                this.buffer.removeFirst();
            } catch (NoSuchElementException e) {
            }
        }
        this.buffer.add(in);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void evictFirst() {
        try {
            this.buffer.removeFirst();
        } catch (NoSuchElementException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isBufferEmpty() {
        return this.buffer.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitFinalWindow(List<TriggerPolicy<IN>> list) {
        if (this.buffer.isEmpty()) {
            return;
        }
        this.currentTriggerPolicies.clear();
        if (list != null) {
            this.currentTriggerPolicies.addAll(list);
        }
        Iterator<TriggerPolicy<IN>> it = this.triggerPolicies.iterator();
        while (it.hasNext()) {
            this.currentTriggerPolicies.add(it.next());
        }
        callUserFunctionAndLogException();
    }
}
