package org.apache.flink.statefun.flink.core.functions;

import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.di.Lazy;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.queue.Locks;
import org.apache.flink.statefun.flink.core.queue.MpscQueue;
import org.apache.flink.statefun.sdk.Address;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/AsyncSink.class */
final class AsyncSink {
    private final PendingAsyncOperations pendingAsyncOperations;
    private final Lazy<Reductions> reductions;
    private final Executor operatorMailbox;
    private final BackPressureValve backPressureValve;
    private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock());

    @Inject
    AsyncSink(PendingAsyncOperations pendingAsyncOperations, @Label("mailbox-executor") Executor executor, @Label("reductions") Lazy<Reductions> lazy, @Label("backpressure-valve") BackPressureValve backPressureValve) {
        this.pendingAsyncOperations = (PendingAsyncOperations) Objects.requireNonNull(pendingAsyncOperations);
        this.reductions = (Lazy) Objects.requireNonNull(lazy);
        this.operatorMailbox = (Executor) Objects.requireNonNull(executor);
        this.backPressureValve = (BackPressureValve) Objects.requireNonNull(backPressureValve);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void accept(Message message, CompletableFuture<T> completableFuture) {
        long nextLong = ThreadLocalRandom.current().nextLong();
        this.pendingAsyncOperations.add(message.source(), nextLong, message);
        this.backPressureValve.notifyAsyncOperationRegistered();
        completableFuture.whenComplete((BiConsumer) (obj, th) -> {
            enqueue(message, nextLong, obj, th);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void blockAddress(Address address) {
        this.backPressureValve.blockAddress(address);
    }

    private <T> void enqueue(Message message, long j, T t, Throwable th) {
        if (this.completed.add(new AsyncMessageDecorator(this.pendingAsyncOperations, j, message, t, th)) == 1) {
            this.operatorMailbox.execute(this::drainOnOperatorThread);
        }
    }

    private void drainOnOperatorThread() {
        Deque<Message> drainAll = this.completed.drainAll();
        Reductions reductions = this.reductions.get();
        while (true) {
            Message poll = drainAll.poll();
            if (poll == null) {
                reductions.processEnvelopes();
                return;
            } else {
                this.backPressureValve.notifyAsyncOperationCompleted(poll.target());
                reductions.enqueue(poll);
            }
        }
    }
}
