package cloud.orbit.actors.concurrent;

import cloud.orbit.actors.runtime.InternalUtils;
import cloud.orbit.concurrent.Task;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/orbit/actors/concurrent/WaitFreeExecutionSerializer.class */
public class WaitFreeExecutionSerializer implements ExecutionSerializer, Executor {
    private static final Logger logger = LoggerFactory.getLogger(WaitFreeExecutionSerializer.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final AtomicIntegerFieldUpdater<WaitFreeExecutionSerializer> lockUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "lock");
    private static final AtomicIntegerFieldUpdater<WaitFreeExecutionSerializer> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "size");
    private final ExecutorService executorService;
    private final ConcurrentLinkedQueue<Supplier<Task<?>>> queue;
    private volatile int lock;
    private volatile int size;
    private final Object key;

    public WaitFreeExecutionSerializer(ExecutorService executorService) {
        this(executorService, null);
    }

    public WaitFreeExecutionSerializer(ExecutorService executorService, Object obj) {
        this.queue = new ConcurrentLinkedQueue<>();
        this.lock = 0;
        this.size = 0;
        this.executorService = executorService;
        this.key = obj;
    }

    public <R> Task<R> executeSerialized(Supplier<Task<R>> supplier, int i) {
        Task<R> task = new Task<>();
        int i2 = this.size;
        if (DEBUG_ENABLED && i2 >= i / 10) {
            logger.debug("Queued " + i2 + " / " + i + " for " + this.key);
        }
        if (i2 >= i || !this.queue.add(() -> {
            Task safeInvoke = InternalUtils.safeInvoke(supplier);
            InternalUtils.linkFutures(safeInvoke, task);
            return safeInvoke;
        })) {
            return Task.fromException(new IllegalStateException(String.format("Queue full for %s (%d > %d)", this.key, Integer.valueOf(this.queue.size()), Integer.valueOf(i))));
        }
        sizeUpdater.incrementAndGet(this);
        tryExecute(false);
        return task;
    }

    public boolean isBusy() {
        return this.lock == 1 || !this.queue.isEmpty();
    }

    private void tryExecute(boolean z) {
        while (lock()) {
            Supplier<Task<?>> poll = this.queue.poll();
            if (poll != null) {
                sizeUpdater.decrementAndGet(this);
                if (!z) {
                    Task task = new Task();
                    this.executorService.execute(() -> {
                        wrapExecution(poll, task);
                        if (!task.isDone()) {
                            task.whenCompleteAsync(this::whenCompleteAsync, this.executorService);
                        } else {
                            unlock();
                            tryExecute(true);
                        }
                    });
                    return;
                }
                try {
                    Task<?> task2 = poll.get();
                    if (task2 != null && !task2.isDone()) {
                        task2.whenCompleteAsync(this::whenCompleteAsync, this.executorService);
                        return;
                    }
                } catch (Throwable th) {
                    try {
                        logger.error("Error executing action", th);
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                        th.printStackTrace();
                    }
                }
            }
            unlock();
            if (this.queue.isEmpty()) {
                return;
            }
        }
    }

    private void wrapExecution(Supplier<Task<?>> supplier, Task<?> task) {
        try {
            Task<?> task2 = supplier.get();
            if (task2 == null || (task2.isDone() && !task2.isCompletedExceptionally())) {
                task.complete((Object) null);
            } else {
                task2.whenComplete((obj, th) -> {
                    if (th != null) {
                        task.completeExceptionally(th);
                    } else {
                        task.complete((Object) null);
                    }
                });
            }
        } catch (Exception e) {
            task.completeExceptionally(e);
        }
    }

    private void unlock() {
        if (lockUpdater.compareAndSet(this, 1, 0)) {
            return;
        }
        logger.error("Unlocking without having the lock");
    }

    private boolean lock() {
        return lockUpdater.compareAndSet(this, 0, 1);
    }

    private <T> void whenCompleteAsync(T t, Throwable th) {
        unlock();
        tryExecute(true);
    }

    @Override // java.util.concurrent.Executor
    public void execute(Runnable runnable) {
        executeSerialized(() -> {
            runnable.run();
            return Task.done();
        }, Integer.MAX_VALUE);
    }
}
