package org.apache.storm.executor;

import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/executor/ExecutorTransfer.class */
public class ExecutorTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
    private final WorkerState workerData;
    private final KryoTupleSerializer serializer;
    private final boolean isDebug;
    private int indexingBase = 0;
    private ArrayList<JCQueue> localReceiveQueues;
    private AtomicReferenceArray<JCQueue> queuesToFlush;

    public ExecutorTransfer(WorkerState workerState, Map<String, Object> map) {
        this.workerData = workerState;
        this.serializer = new KryoTupleSerializer(map, workerState.getWorkerTopologyContext());
        this.isDebug = ObjectReader.getBoolean(map.get(Config.TOPOLOGY_DEBUG), false);
    }

    public void initLocalRecvQueues() {
        Integer num = this.workerData.getLocalReceiveQueues().keySet().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).get();
        this.localReceiveQueues = Utils.convertToArray(this.workerData.getLocalReceiveQueues(), num.intValue());
        this.indexingBase = num.intValue();
        this.queuesToFlush = new AtomicReferenceArray<>(this.localReceiveQueues.size());
    }

    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> queue) {
        if (this.isDebug) {
            LOG.info("TRANSFERRING tuple {}", addressedTuple);
        }
        JCQueue localQueue = getLocalQueue(addressedTuple);
        return localQueue != null ? tryTransferLocal(addressedTuple, localQueue, queue) : this.workerData.tryTransferRemote(addressedTuple, queue, this.serializer);
    }

    public void flush() throws InterruptedException {
        flushLocal();
        this.workerData.flushRemotes();
    }

    private void flushLocal() throws InterruptedException {
        for (int i = 0; i < this.queuesToFlush.length(); i++) {
            JCQueue jCQueue = this.queuesToFlush.get(i);
            if (jCQueue != null) {
                jCQueue.flush();
                this.queuesToFlush.set(i, null);
            }
        }
    }

    public JCQueue getLocalQueue(AddressedTuple addressedTuple) {
        if (addressedTuple.dest - this.indexingBase >= this.localReceiveQueues.size()) {
            return null;
        }
        return this.localReceiveQueues.get(addressedTuple.dest - this.indexingBase);
    }

    public boolean tryTransferLocal(AddressedTuple addressedTuple, JCQueue jCQueue, Queue<AddressedTuple> queue) {
        this.workerData.checkSerialize(this.serializer, addressedTuple);
        if (queue == null) {
            return jCQueue.tryPublish(addressedTuple);
        }
        if (queue.isEmpty() && jCQueue.tryPublish(addressedTuple)) {
            this.queuesToFlush.set(addressedTuple.dest - this.indexingBase, jCQueue);
            return true;
        }
        queue.add(addressedTuple);
        return false;
    }
}
