package org.apache.storm.trident.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.Aggregator;
import org.apache.storm.trident.planner.ProcessorContext;
import org.apache.storm.trident.planner.TridentProcessor;
import org.apache.storm.trident.planner.processor.FreshCollector;
import org.apache.storm.trident.planner.processor.TridentContext;
import org.apache.storm.trident.spout.IBatchID;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.tuple.TridentTupleView;
import org.apache.storm.trident.windowing.AbstractTridentWindowManager;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/trident/windowing/WindowTridentProcessor.class */
public class WindowTridentProcessor implements TridentProcessor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) WindowTridentProcessor.class);
    public static final String TRIGGER_INPROCESS_PREFIX = "tip|";
    public static final String TRIGGER_PREFIX = "tr|";
    public static final String TRIGGER_COUNT_PREFIX = "tc|";
    public static final String TRIGGER_FIELD_NAME = "_task_info";
    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100;
    private final String windowId;
    private final Fields inputFields;
    private final Aggregator aggregator;
    private final boolean storeTuplesInStore;
    private String windowTriggerInprocessId;
    private WindowConfig windowConfig;
    private WindowsStoreFactory windowStoreFactory;
    private WindowsStore windowStore;
    private TopologyContext topologyContext;
    private FreshCollector collector;
    private TridentTupleView.ProjectionFactory projection;
    private TridentContext tridentContext;
    private ITridentWindowManager tridentWindowManager;
    private String windowTaskId;

    /* loaded from: input_file:org/apache/storm/trident/windowing/WindowTridentProcessor$TriggerInfo.class */
    public static class TriggerInfo implements Serializable {
        public final String windowTaskId;
        public final int triggerId;

        public TriggerInfo(String str, int i) {
            this.windowTaskId = str;
            this.triggerId = i;
        }

        public String generateTriggerKey() {
            return WindowTridentProcessor.generateWindowTriggerKey(this.windowTaskId, this.triggerId);
        }

        public String toString() {
            return "TriggerInfo{windowTaskId='" + this.windowTaskId + "', triggerId=" + this.triggerId + '}';
        }
    }

    public WindowTridentProcessor(WindowConfig windowConfig, String str, WindowsStoreFactory windowsStoreFactory, Fields fields, Aggregator aggregator, boolean z) {
        this.windowConfig = windowConfig;
        this.windowId = str;
        this.windowStoreFactory = windowsStoreFactory;
        this.inputFields = fields;
        this.aggregator = aggregator;
        this.storeTuplesInStore = z;
    }

    @Override // org.apache.storm.trident.planner.TridentProcessor
    public void prepare(Map map, TopologyContext topologyContext, TridentContext tridentContext) {
        this.topologyContext = topologyContext;
        List<TridentTuple.Factory> parentTupleFactories = tridentContext.getParentTupleFactories();
        if (parentTupleFactories.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }
        Long windowTuplesCacheSize = getWindowTuplesCacheSize(map);
        this.tridentContext = tridentContext;
        this.collector = new FreshCollector(tridentContext);
        this.projection = new TridentTupleView.ProjectionFactory(parentTupleFactories.get(0), this.inputFields);
        this.windowStore = this.windowStoreFactory.create(map);
        this.windowTaskId = this.windowId + WindowsStore.KEY_SEPARATOR + this.topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        this.windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(this.windowTaskId);
        this.tridentWindowManager = this.storeTuplesInStore ? new StoreBasedTridentWindowManager(this.windowConfig, this.windowTaskId, this.windowStore, this.aggregator, tridentContext.getDelegateCollector(), windowTuplesCacheSize, this.inputFields) : new InMemoryTridentWindowManager(this.windowConfig, this.windowTaskId, this.windowStore, this.aggregator, tridentContext.getDelegateCollector());
        this.tridentWindowManager.prepare();
    }

    public static String getWindowTriggerInprocessIdPrefix(String str) {
        return TRIGGER_INPROCESS_PREFIX + str;
    }

    public static String getWindowTriggerTaskPrefix(String str) {
        return TRIGGER_PREFIX + str;
    }

    private Long getWindowTuplesCacheSize(Map map) {
        if (map.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) {
            return Long.valueOf(((Number) map.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue());
        }
        return 100L;
    }

    @Override // org.apache.storm.trident.planner.TridentProcessor
    public void cleanup() {
        LOG.info("shutting down window manager");
        try {
            this.tridentWindowManager.shutdown();
        } catch (Exception e) {
            LOG.error("Error occurred while cleaning up window processor", (Throwable) e);
            throw e;
        }
    }

    @Override // org.apache.storm.trident.planner.TridentProcessor
    public void startBatch(ProcessorContext processorContext) {
        processorContext.state[this.tridentContext.getStateIndex()] = new ArrayList();
    }

    @Override // org.apache.storm.trident.planner.TupleReceiver
    public void execute(ProcessorContext processorContext, String str, TridentTuple tridentTuple) {
        ((List) processorContext.state[this.tridentContext.getStateIndex()]).add(this.projection.create(tridentTuple));
    }

    @Override // org.apache.storm.trident.planner.TridentProcessor
    public void finishBatch(ProcessorContext processorContext) {
        Object obj = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(obj);
        LOG.debug("Received finishBatch of : [{}] ", obj);
        this.tridentWindowManager.addTuplesBatch(obj, (List) processorContext.state[this.tridentContext.getStateIndex()]);
        List list = null;
        ArrayList arrayList = new ArrayList();
        Iterable<Object> iterable = null;
        if (retriedAttempt(obj)) {
            list = (List) this.windowStore.get(inprocessTriggerKey(batchTxnId));
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    arrayList.add(triggerKey(((Integer) it.next()).intValue()));
                }
                iterable = this.windowStore.get(arrayList);
            }
        }
        if (iterable == null) {
            list = new ArrayList();
            Queue<AbstractTridentWindowManager.TriggerResult> pendingTriggers = this.tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", obj, Integer.valueOf(pendingTriggers.size()));
            try {
                Iterator<AbstractTridentWindowManager.TriggerResult> it2 = pendingTriggers.iterator();
                ArrayList arrayList2 = new ArrayList();
                while (it2.hasNext()) {
                    AbstractTridentWindowManager.TriggerResult next = it2.next();
                    for (List<Object> list2 : next.result) {
                        arrayList.add(triggerKey(next.id));
                        arrayList2.add(list2);
                        list.add(Integer.valueOf(next.id));
                    }
                    it2.remove();
                }
                iterable = arrayList2;
            } finally {
                if (!list.isEmpty()) {
                    this.windowStore.put(inprocessTriggerKey(batchTxnId), list);
                }
            }
        }
        this.collector.setContext(processorContext);
        int i = 0;
        Iterator<Object> it3 = iterable.iterator();
        while (it3.hasNext()) {
            int i2 = i;
            i++;
            this.collector.emit(new ConsList(new TriggerInfo(this.windowTaskId, ((Integer) list.get(i2)).intValue()), (List) it3.next()));
        }
        this.collector.setContext(null);
    }

    private String inprocessTriggerKey(Object obj) {
        return this.windowTriggerInprocessId + obj;
    }

    public static Object getBatchTxnId(Object obj) {
        if (obj instanceof IBatchID) {
            return ((IBatchID) obj).getId();
        }
        return null;
    }

    static boolean retriedAttempt(Object obj) {
        return (obj instanceof IBatchID) && ((IBatchID) obj).getAttemptId() > 0;
    }

    @Override // org.apache.storm.trident.planner.TridentProcessor
    public TridentTuple.Factory getOutputFactory() {
        return this.collector.getOutputFactory();
    }

    public String triggerKey(int i) {
        return generateWindowTriggerKey(this.windowTaskId, i);
    }

    public static String generateWindowTriggerKey(String str, int i) {
        return TRIGGER_PREFIX + str + i;
    }
}
