package org.apache.storm.trident.windowing;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.trident.operation.Aggregator;
import org.apache.storm.trident.spout.IBatchID;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.tuple.TridentTupleView;
import org.apache.storm.trident.windowing.AbstractTridentWindowManager;
import org.apache.storm.trident.windowing.WindowsStore;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.apache.storm.windowing.WindowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.class */
public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
    private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
    private static final String TUPLE_PREFIX = "tu|";
    private final String windowTupleTaskId;
    private final TridentTupleView.FreshOutputFactory freshOutputFactory;
    private Long maxCachedTuplesSize;
    private final Fields inputFields;
    private AtomicLong currentCachedTuplesSize;

    public StoreBasedTridentWindowManager(WindowConfig windowConfig, String str, WindowsStore windowsStore, Aggregator aggregator, BatchOutputCollector batchOutputCollector, Long l, Fields fields) {
        super(windowConfig, str, windowsStore, aggregator, batchOutputCollector);
        this.currentCachedTuplesSize = new AtomicLong();
        this.maxCachedTuplesSize = l;
        this.inputFields = fields;
        this.freshOutputFactory = new TridentTupleView.FreshOutputFactory(fields);
        this.windowTupleTaskId = TUPLE_PREFIX + str;
    }

    @Override // org.apache.storm.trident.windowing.AbstractTridentWindowManager
    protected void initialize() {
        String windowTriggerInprocessIdPrefix = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(this.windowTaskId);
        String windowTriggerTaskPrefix = WindowTridentProcessor.getWindowTriggerTaskPrefix(this.windowTaskId);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (String str : this.windowStore.getAllKeys()) {
            if (str.startsWith(this.windowTupleTaskId)) {
                int lastPart = lastPart(str);
                String secondLastPart = secondLastPart(str);
                LOG.debug("Received tuple with batch [{}] and tuple index [{}]", secondLastPart, Integer.valueOf(lastPart));
                this.windowManager.add((WindowManager<T>) new TridentBatchTuple(secondLastPart, System.currentTimeMillis(), lastPart));
            } else if (str.startsWith(windowTriggerTaskPrefix)) {
                arrayList2.add(str);
                LOG.debug("Received trigger with key [{}]", str);
            } else if (str.startsWith(windowTriggerInprocessIdPrefix)) {
                arrayList.add(str);
                LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", str);
            }
        }
        HashSet hashSet = new HashSet();
        Iterator<Object> it = this.windowStore.get(arrayList).iterator();
        while (it.hasNext()) {
            hashSet.addAll((List) it.next());
        }
        int i = 0;
        for (Object obj : this.windowStore.get(arrayList2)) {
            int i2 = i;
            i++;
            int lastPart2 = lastPart((String) arrayList2.get(i2));
            if (!hashSet.contains(Integer.valueOf(lastPart2))) {
                LOG.info("Adding pending trigger value [{}]", obj);
                this.pendingTriggers.add(new AbstractTridentWindowManager.TriggerResult(lastPart2, (List) obj));
            }
        }
    }

    private int lastPart(String str) {
        int lastIndexOf = str.lastIndexOf(WindowsStore.KEY_SEPARATOR);
        if (lastIndexOf < 0) {
            throw new IllegalArgumentException("primaryKey does not have key separator '|'");
        }
        return Integer.parseInt(str.substring(lastIndexOf + 1));
    }

    private String secondLastPart(String str) {
        int lastIndexOf = str.lastIndexOf(WindowsStore.KEY_SEPARATOR);
        if (lastIndexOf < 0) {
            throw new IllegalArgumentException("key " + str + " does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
        }
        int lastIndexOf2 = str.substring(0, lastIndexOf).lastIndexOf(WindowsStore.KEY_SEPARATOR);
        if (lastIndexOf < 0) {
            throw new IllegalArgumentException("key " + str + " does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
        }
        return str.substring(lastIndexOf2 + 1, lastIndexOf);
    }

    @Override // org.apache.storm.trident.windowing.ITridentWindowManager
    public void addTuplesBatch(Object obj, List<TridentTuple> list) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", obj);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            arrayList.add(new WindowsStore.Entry(keyOf(obj) + i, list.get(i).select(this.inputFields)));
        }
        this.windowStore.putAll(arrayList);
        for (int i2 = 0; i2 < list.size(); i2++) {
            addToWindowManager(i2, keyOf(obj), list.get(i2));
        }
    }

    private void addToWindowManager(int i, String str, TridentTuple tridentTuple) {
        TridentTuple tridentTuple2 = null;
        if (this.maxCachedTuplesSize == null || this.currentCachedTuplesSize.get() < this.maxCachedTuplesSize.longValue()) {
            tridentTuple2 = tridentTuple;
        }
        this.currentCachedTuplesSize.incrementAndGet();
        this.windowManager.add((WindowManager<T>) new TridentBatchTuple(str, System.currentTimeMillis(), i, tridentTuple2));
    }

    public String getBatchTxnId(Object obj) {
        if (obj instanceof IBatchID) {
            return ((IBatchID) obj).getId().toString();
        }
        throw new IllegalArgumentException("argument should be an IBatchId instance");
    }

    public String keyOf(Object obj) {
        return this.windowTupleTaskId + getBatchTxnId(obj) + WindowsStore.KEY_SEPARATOR;
    }

    @Override // org.apache.storm.trident.windowing.AbstractTridentWindowManager
    public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<TridentBatchTuple> it = list.iterator();
        while (it.hasNext()) {
            TridentTuple collectTridentTupleOrKey = collectTridentTupleOrKey(it.next(), arrayList2);
            if (collectTridentTupleOrKey != null) {
                arrayList.add(collectTridentTupleOrKey);
            }
        }
        if (arrayList2.size() > 0) {
            Iterator<Object> it2 = this.windowStore.get(arrayList2).iterator();
            while (it2.hasNext()) {
                arrayList.add(this.freshOutputFactory.create((List) it2.next()));
            }
        }
        return arrayList;
    }

    public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> list) {
        if (tridentBatchTuple.tridentTuple != null) {
            return tridentBatchTuple.tridentTuple;
        }
        list.add(tupleKey(tridentBatchTuple));
        return null;
    }

    @Override // org.apache.storm.trident.windowing.AbstractTridentWindowManager
    public void onTuplesExpired(List<TridentBatchTuple> list) {
        if (this.maxCachedTuplesSize != null) {
            this.currentCachedTuplesSize.addAndGet(-list.size());
        }
        ArrayList arrayList = new ArrayList();
        Iterator<TridentBatchTuple> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(tupleKey(it.next()));
        }
        this.windowStore.removeAll(arrayList);
    }

    private String tupleKey(TridentBatchTuple tridentBatchTuple) {
        return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
    }
}
