package dev.responsive.kafka.api.async.internals.queues;

import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/queues/KeyOrderPreservingQueue.class */
public class KeyOrderPreservingQueue<KIn> implements SchedulingQueue<KIn> {
    private final Logger log;
    private final Map<KIn, KeyEventQueue> blockedEvents = new HashMap();
    private final Queue<AsyncEvent> processableEvents = new LinkedList();
    private final int maxQueueSizePerKey;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/queues/KeyOrderPreservingQueue$KeyEventQueue.class */
    public static class KeyEventQueue {
        private final Logger log;
        private final int maxQueueSizePerKey;
        private final Queue<AsyncEvent> blockedEvents = new LinkedList();
        private AsyncEvent inFlightEvent;

        public KeyEventQueue(Logger logger, int i) {
            this.log = logger;
            this.maxQueueSizePerKey = i;
        }

        public boolean isBlocked() {
            return this.inFlightEvent != null;
        }

        public boolean isFull() {
            return size() >= this.maxQueueSizePerKey;
        }

        public void scheduleNewEvent(AsyncEvent asyncEvent) {
            if (isBlocked()) {
                throw new IllegalStateException("Attempted to schedule new event while blocked by in-flight event");
            }
            this.inFlightEvent = asyncEvent;
        }

        public AsyncEvent scheduleNextEvent() {
            if (!isBlocked()) {
                throw new IllegalStateException("Attempted to schedule next event but there was no in-flight event");
            }
            AsyncEvent poll = this.blockedEvents.poll();
            this.inFlightEvent = poll;
            return poll;
        }

        public void addBlockedEvent(AsyncEvent asyncEvent) {
            if (!isBlocked()) {
                throw new IllegalStateException("Attempted to add event to blocked queue, but this key is not currently blocked");
            }
            if (isFull()) {
                this.log.error("Tried to offer new event but the key's queue size in SchedulingQueue's is {} which is equal or greater than the size limit {}", Integer.valueOf(size()), Integer.valueOf(this.maxQueueSizePerKey));
                throw new IllegalStateException("Attempted to add event while key queue was full");
            }
            this.blockedEvents.add(asyncEvent);
        }

        public int size() {
            return isBlocked() ? this.blockedEvents.size() + 1 : this.blockedEvents.size();
        }
    }

    public KeyOrderPreservingQueue(String str, int i) {
        this.log = new LogContext(str).logger(KeyOrderPreservingQueue.class);
        this.maxQueueSizePerKey = i;
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public boolean isEmpty() {
        return this.processableEvents.isEmpty() && this.blockedEvents.isEmpty();
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public int totalEnqueuedEvents() {
        return this.processableEvents.size() + this.blockedEvents.values().stream().mapToInt(keyEventQueue -> {
            return keyEventQueue.blockedEvents.size();
        }).sum();
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public int longestQueueSize() {
        return this.blockedEvents.values().stream().mapToInt(keyEventQueue -> {
            return keyEventQueue.blockedEvents.size();
        }).max().orElse(0);
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public void unblockKey(KIn kin) {
        KeyEventQueue orCreateKeyQueue = getOrCreateKeyQueue(kin);
        if (!orCreateKeyQueue.isBlocked()) {
            throw new IllegalStateException("Attempted to unblock a key but it was not blocked");
        }
        AsyncEvent scheduleNextEvent = orCreateKeyQueue.scheduleNextEvent();
        if (scheduleNextEvent != null) {
            this.processableEvents.offer(scheduleNextEvent);
        } else {
            this.blockedEvents.remove(kin);
        }
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public boolean hasProcessableRecord() {
        return !this.processableEvents.isEmpty();
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public AsyncEvent poll() {
        return this.processableEvents.poll();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public void offer(AsyncEvent asyncEvent) {
        KeyEventQueue orCreateKeyQueue = getOrCreateKeyQueue(asyncEvent.inputRecordKey());
        if (orCreateKeyQueue.isBlocked()) {
            orCreateKeyQueue.addBlockedEvent(asyncEvent);
        } else {
            orCreateKeyQueue.scheduleNewEvent(asyncEvent);
            this.processableEvents.offer(asyncEvent);
        }
    }

    @Override // dev.responsive.kafka.api.async.internals.queues.SchedulingQueue
    public boolean keyQueueIsFull(KIn kin) {
        return getOrCreateKeyQueue(kin).isFull();
    }

    private KeyEventQueue getOrCreateKeyQueue(KIn kin) {
        return this.blockedEvents.computeIfAbsent(kin, obj -> {
            return new KeyEventQueue(this.log, this.maxQueueSizePerKey);
        });
    }
}
