package org.apache.giraph.comm.messages.queue;

import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.class */
public final class AsyncMessageStoreWrapper<I extends WritableComparable, M extends Writable> implements MessageStore<I, M> {
    private static final Logger LOG = Logger.getLogger(AsyncMessageStoreWrapper.class);
    private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE = new PartitionMessage(-1, null);
    private static final PartitionMessage CLEAR_QUEUE_MESSAGE = new PartitionMessage(-1, null);
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
    private final int threadsCount;
    private final BlockingQueue<PartitionMessage<I, M>>[] queues;
    private final Int2IntMap partition2Queue = new Int2IntArrayMap();
    private Semaphore completionSemaphore;
    private final MessageStore<I, M> store;

    /* loaded from: input_file:org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper$MessageStoreQueueWorker.class */
    private class MessageStoreQueueWorker implements Runnable {
        private final BlockingQueue<PartitionMessage<I, M>> queue;

        private MessageStoreQueueWorker(BlockingQueue<PartitionMessage<I, M>> blockingQueue) {
            this.queue = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            PartitionMessage<I, M> partitionMessage = null;
            while (true) {
                try {
                    partitionMessage = this.queue.take();
                    if (partitionMessage.getMessage() != null) {
                        AsyncMessageStoreWrapper.this.store.addPartitionMessages(partitionMessage.getPartitionId(), partitionMessage.getMessage());
                    } else {
                        AsyncMessageStoreWrapper.this.completionSemaphore.release();
                        if (partitionMessage == AsyncMessageStoreWrapper.SHUTDOWN_QUEUE_MESSAGE) {
                            return;
                        }
                    }
                } catch (InterruptedException e) {
                    AsyncMessageStoreWrapper.LOG.error("MessageStoreQueueWorker.run: " + partitionMessage, e);
                    return;
                }
            }
        }
    }

    public AsyncMessageStoreWrapper(MessageStore<I, M> messageStore, Iterable<Integer> iterable, int i) {
        this.store = messageStore;
        this.threadsCount = i;
        this.completionSemaphore = new Semaphore(1 - this.threadsCount);
        this.queues = new BlockingQueue[this.threadsCount];
        LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + this.threadsCount);
        for (int i2 = 0; i2 < this.threadsCount; i2++) {
            this.queues[i2] = new LinkedBlockingQueue();
            EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(this.queues[i2]));
        }
        int i3 = 0;
        Iterator<Integer> it = iterable.iterator();
        while (it.hasNext()) {
            int i4 = i3;
            i3++;
            this.partition2Queue.put(it.next().intValue(), i4 % this.threadsCount);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean isPointerListEncoding() {
        return this.store.isPointerListEncoding();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<M> getVertexMessages(I i) {
        return this.store.getVertexMessages(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearVertexMessages(I i) {
        this.store.clearVertexMessages(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearAll() {
        try {
            for (BlockingQueue<PartitionMessage<I, M>> blockingQueue : this.queues) {
                blockingQueue.put(SHUTDOWN_QUEUE_MESSAGE);
            }
            this.completionSemaphore.acquire();
            this.store.clearAll();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForVertex(I i) {
        return this.store.hasMessagesForVertex(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForPartition(int i) {
        return this.store.hasMessagesForPartition(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void addPartitionMessages(int i, VertexIdMessages<I, M> vertexIdMessages) {
        try {
            this.queues[this.partition2Queue.get(i)].put(new PartitionMessage<>(i, vertexIdMessages));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void addMessage(I i, M m) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void finalizeStore() {
        this.store.finalizeStore();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<I> getPartitionDestinationVertices(int i) {
        return this.store.getPartitionDestinationVertices(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearPartition(int i) {
        this.store.clearPartition(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void writePartition(DataOutput dataOutput, int i) throws IOException {
        this.store.writePartition(dataOutput, i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void readFieldsForPartition(DataInput dataInput, int i) throws IOException {
        this.store.readFieldsForPartition(dataInput, i);
    }

    public void waitToComplete() {
        try {
            for (BlockingQueue<PartitionMessage<I, M>> blockingQueue : this.queues) {
                blockingQueue.put(CLEAR_QUEUE_MESSAGE);
            }
            this.completionSemaphore.acquire();
            this.completionSemaphore = new Semaphore(1 - this.threadsCount);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
