package org.apache.heron.instance;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.heron.api.Config;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/instance/AbstractOutputCollector.class */
public class AbstractOutputCollector {
    protected final IPluggableSerializer serializer;
    protected final OutgoingTupleCollection outputter;
    protected final ComponentMetrics metrics;
    protected final boolean ackEnabled;
    private PhysicalPlanHelper helper;
    public final ReentrantLock lock = new ReentrantLock();
    private long totalTuplesEmitted = 0;
    private long totalBytesEmitted = 0;

    public AbstractOutputCollector(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, ComponentMetrics componentMetrics) {
        this.serializer = iPluggableSerializer;
        this.metrics = componentMetrics;
        this.helper = physicalPlanHelper;
        Map<String, Object> topologyConfig = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        if (topologyConfig.containsKey(Config.TOPOLOGY_RELIABILITY_MODE) && topologyConfig.get(Config.TOPOLOGY_RELIABILITY_MODE) != null) {
            this.ackEnabled = Config.TopologyReliabilityMode.valueOf(topologyConfig.get(Config.TOPOLOGY_RELIABILITY_MODE).toString()) == Config.TopologyReliabilityMode.ATLEAST_ONCE;
        } else if (!topologyConfig.containsKey(Config.TOPOLOGY_ENABLE_ACKING) || topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING) == null) {
            this.ackEnabled = false;
        } else {
            this.ackEnabled = Boolean.parseBoolean(topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
        }
        this.outputter = new OutgoingTupleCollection(physicalPlanHelper, communicator, this.lock, componentMetrics);
    }

    public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
        this.helper = physicalPlanHelper;
        this.outputter.updatePhysicalPlanHelper(physicalPlanHelper);
    }

    public PhysicalPlanHelper getPhysicalPlanHelper() {
        return this.helper;
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public void sendOutState(State<Serializable, Serializable> state, String str, boolean z, String str2) {
        this.outputter.sendOutState(state, str, z, str2);
    }

    public void clear() {
        this.outputter.clear();
    }

    public long getTotalTuplesEmitted() {
        return this.totalTuplesEmitted;
    }

    public long getTotalBytesEmitted() {
        return this.totalBytesEmitted;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HeronTuples.HeronDataTuple.Builder initTupleBuilder(String str, List<Object> list, Integer num) {
        HeronTuples.HeronDataTuple.Builder newBuilder = HeronTuples.HeronDataTuple.newBuilder();
        newBuilder.setKey(0L);
        List<Integer> list2 = null;
        if (num != null) {
            list2 = new ArrayList();
            list2.add(num);
        } else if (!this.helper.isCustomGroupingEmpty()) {
            list2 = this.helper.chooseTasksForCustomStreamGrouping(str, list);
        }
        if (list2 != null) {
            newBuilder.addAllDestTaskIds(list2);
        }
        this.helper.getTopologyContext().invokeHookEmit(list, str, list2);
        return newBuilder;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendTuple(HeronTuples.HeronDataTuple.Builder builder, String str, List<Object> list) {
        long j = 0;
        long nanoTime = System.nanoTime();
        Iterator<Object> it = list.iterator();
        while (it.hasNext()) {
            builder.addValues(ByteString.copyFrom(this.serializer.serialize(it.next())));
            j += r0.length;
        }
        this.metrics.serializeDataTuple(str, System.nanoTime() - nanoTime);
        this.outputter.addDataTuple(str, builder, j);
        this.totalTuplesEmitted++;
        this.totalBytesEmitted += j;
        this.metrics.emittedTuple(str);
    }
}
