package org.apache.flink.statefun.flink.core.feedback;

import java.io.Closeable;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/FeedbackChannel.class */
public final class FeedbackChannel<T> implements Closeable {
    private final SubtaskFeedbackKey<T> key;
    private final FeedbackQueue<T> queue;
    private final AtomicReference<ConsumerTask<T>> consumerRef = new AtomicReference<>();

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/FeedbackChannel$ConsumerTask.class */
    private static final class ConsumerTask<T> implements Runnable, Closeable {
        private final Executor executor;
        private final FeedbackConsumer<T> consumer;
        private final FeedbackQueue<T> queue;

        ConsumerTask(Executor executor, FeedbackConsumer<T> feedbackConsumer, FeedbackQueue<T> feedbackQueue) {
            this.executor = (Executor) Objects.requireNonNull(executor);
            this.consumer = (FeedbackConsumer) Objects.requireNonNull(feedbackConsumer);
            this.queue = (FeedbackQueue) Objects.requireNonNull(feedbackQueue);
        }

        void scheduleDrainAll() {
            this.executor.execute(this);
        }

        @Override // java.lang.Runnable
        public void run() {
            Deque<T> drainAll = this.queue.drainAll();
            while (true) {
                try {
                    T pollFirst = drainAll.pollFirst();
                    if (pollFirst == null) {
                        return;
                    } else {
                        this.consumer.processFeedback(pollFirst);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FeedbackChannel(SubtaskFeedbackKey<T> subtaskFeedbackKey, FeedbackQueue<T> feedbackQueue) {
        this.key = (SubtaskFeedbackKey) Objects.requireNonNull(subtaskFeedbackKey);
        this.queue = (FeedbackQueue) Objects.requireNonNull(feedbackQueue);
    }

    public void put(T t) {
        ConsumerTask<T> consumerTask;
        if (this.queue.addAndCheckIfWasEmpty(t) && (consumerTask = this.consumerRef.get()) != null) {
            consumerTask.scheduleDrainAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerConsumer(FeedbackConsumer<T> feedbackConsumer, Executor executor) {
        Objects.requireNonNull(feedbackConsumer);
        ConsumerTask<T> consumerTask = new ConsumerTask<>(executor, feedbackConsumer, this.queue);
        if (!this.consumerRef.compareAndSet(null, consumerTask)) {
            throw new IllegalStateException("There can be only a single consumer in a FeedbackChannel.");
        }
        consumerTask.scheduleDrainAll();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        IOUtils.closeQuietly(this.consumerRef.getAndSet(null));
        FeedbackChannelBroker.get().removeChannel(this.key);
    }
}
