package org.apache.asterix.runtime.transaction;

import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.class */
public class GlobalResourceIdFactory implements IResourceIdFactory {
    private static final Logger LOGGER = LogManager.getLogger();
    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
    private final INCServiceContext serviceCtx;
    private final LongPriorityQueue resourceIds = LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ = new LinkedBlockingQueue<>();
    private final String nodeId;

    public GlobalResourceIdFactory(INCServiceContext iNCServiceContext) {
        this.serviceCtx = iNCServiceContext;
        this.nodeId = iNCServiceContext.getNodeId();
    }

    public void addNewIds(ResourceIdRequestResponseMessage resourceIdRequestResponseMessage) throws InterruptedException {
        LOGGER.debug("rec'd block of ids: {}", resourceIdRequestResponseMessage);
        this.resourceIdResponseQ.put(resourceIdRequestResponseMessage);
    }

    public long createId() throws HyracksDataException {
        try {
            long dequeueLong = this.resourceIds.dequeueLong();
            if (this.resourceIds.isEmpty()) {
                this.serviceCtx.getControllerService().getExecutor().submit(() -> {
                    try {
                        requestNewBlock();
                    } catch (Exception e) {
                        LOGGER.warn("failed on preemptive block request", e);
                    }
                });
            }
            return dequeueLong;
        } catch (NoSuchElementException e) {
            try {
                ResourceIdRequestResponseMessage poll = this.resourceIdResponseQ.poll();
                if (poll == null) {
                    requestNewBlock();
                    poll = this.resourceIdResponseQ.take();
                }
                if (poll.getException() != null) {
                    throw HyracksDataException.create(poll.getException());
                }
                long resourceId = poll.getResourceId();
                for (int i = 1; i < poll.getBlockSize(); i++) {
                    this.resourceIds.enqueue(resourceId + i);
                }
                return resourceId;
            } catch (Exception e2) {
                throw HyracksDataException.create(e2);
            }
        }
    }

    protected void requestNewBlock() throws Exception {
        this.serviceCtx.getMessageBroker().sendMessageToPrimaryCC(new ResourceIdRequestMessage(this.nodeId, RESOURCE_ID_BLOCK_SIZE));
    }
}
