package org.apache.beam.fn.harness.data;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.class */
public class QueueingBeamFnDataClient implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QueueingBeamFnDataClient.class);
    private final BeamFnDataClient mainClient;
    private final int queueSize;
    private ClosableQueue queue;

    @GuardedBy("inboundDataClients")
    private boolean isDraining = false;

    @GuardedBy("inboundDataClients")
    private final HashSet<InboundDataClient> inboundDataClients = new HashSet<>();

    @GuardedBy("inboundDataClients")
    private final ArrayList<InboundDataClient> finishedClients = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient$ClosableQueue.class */
    public static class ClosableQueue {
        private static final ConsumerAndData<Object> POISON = new ConsumerAndData<>(obj -> {
            throw new RuntimeException("Unable to accept poison.");
        }, new Object());
        private final LinkedBlockingQueue<ConsumerAndData<?>> queue;
        private final AtomicBoolean closed = new AtomicBoolean();

        ClosableQueue(int i) {
            this.queue = new LinkedBlockingQueue<>(i);
        }

        void close() {
            Preconditions.checkArgument(!this.closed.getAndSet(true));
            if (this.queue.offer(POISON)) {
                return;
            }
            QueueingBeamFnDataClient.LOG.debug("Queue was full, not adding poison");
        }

        boolean offer(ConsumerAndData<?> consumerAndData, long j, TimeUnit timeUnit) throws InterruptedException {
            return this.queue.offer(consumerAndData, j, timeUnit);
        }

        ConsumerAndData<?> take() throws InterruptedException {
            ConsumerAndData<?> poll = this.queue.poll();
            if (poll == null) {
                poll = this.closed.get() ? this.queue.poll() : this.queue.take();
            }
            if (poll == POISON) {
                return null;
            }
            return poll;
        }

        boolean isEmpty() {
            return this.queue.isEmpty() || this.queue.peek() == POISON;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient$ConsumerAndData.class */
    public static class ConsumerAndData<T> {
        private final FnDataReceiver<T> consumer;
        private final T data;

        public ConsumerAndData(FnDataReceiver<T> fnDataReceiver, T t) {
            this.consumer = fnDataReceiver;
            this.data = t;
        }

        void accept() throws Exception {
            this.consumer.accept(this.data);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient$QueueingFnDataReceiver.class */
    private static class QueueingFnDataReceiver<T> implements FnDataReceiver<T> {
        private final FnDataReceiver<T> consumer;
        private final ClosableQueue queue;
        public InboundDataClient inboundDataClient;

        public QueueingFnDataReceiver(FnDataReceiver<T> fnDataReceiver, ClosableQueue closableQueue) {
            this.queue = closableQueue;
            this.consumer = fnDataReceiver;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(T t) throws Exception {
            InboundDataClient inboundDataClient = this.inboundDataClient;
            try {
                ConsumerAndData<?> consumerAndData = new ConsumerAndData<>(this.consumer, t);
                while (!this.queue.offer(consumerAndData, 200L, TimeUnit.MILLISECONDS) && !inboundDataClient.isDone()) {
                }
            } catch (Exception e) {
                QueueingBeamFnDataClient.LOG.error("Failed to insert the value into the queue", (Throwable) e);
                inboundDataClient.fail(e);
                throw e;
            }
        }
    }

    public QueueingBeamFnDataClient(BeamFnDataClient beamFnDataClient, int i) {
        this.mainClient = beamFnDataClient;
        this.queueSize = i;
        this.queue = new ClosableQueue(i);
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, FnDataReceiver<ByteString> fnDataReceiver) {
        LOG.debug("Registering consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getTransformId());
        QueueingFnDataReceiver queueingFnDataReceiver = new QueueingFnDataReceiver(fnDataReceiver, this.queue);
        InboundDataClient receive = this.mainClient.receive(apiServiceDescriptor, logicalEndpoint, queueingFnDataReceiver);
        queueingFnDataReceiver.inboundDataClient = receive;
        synchronized (this.inboundDataClients) {
            Preconditions.checkState(!this.isDraining);
            if (this.inboundDataClients.add(receive)) {
                receive.runWhenComplete(() -> {
                    completeInbound(receive);
                });
            }
        }
        return receive;
    }

    private void completeInbound(InboundDataClient inboundDataClient) {
        Preconditions.checkState(inboundDataClient.isDone());
        synchronized (this.inboundDataClients) {
            if (this.inboundDataClients.remove(inboundDataClient)) {
                this.finishedClients.add(inboundDataClient);
                if (this.inboundDataClients.isEmpty() && this.isDraining) {
                    this.queue.close();
                }
            }
        }
    }

    public void drainAndBlock() throws Exception {
        synchronized (this.inboundDataClients) {
            Preconditions.checkState(!this.isDraining);
            this.isDraining = true;
            if (this.inboundDataClients.isEmpty()) {
                this.queue.close();
            }
        }
        while (true) {
            try {
                ConsumerAndData<?> take = this.queue.take();
                if (take == null) {
                    synchronized (this.inboundDataClients) {
                        Preconditions.checkState(this.inboundDataClients.isEmpty());
                        Preconditions.checkState(this.isDraining);
                    }
                    Preconditions.checkState(this.queue.isEmpty());
                    return;
                }
                take.accept();
            } catch (Exception e) {
                LOG.error("Client failed to deque and process the value", (Throwable) e);
                HashSet hashSet = new HashSet();
                synchronized (this.inboundDataClients) {
                    hashSet.addAll(this.inboundDataClients);
                    hashSet.addAll(this.finishedClients);
                    Iterator it = hashSet.iterator();
                    while (it.hasNext()) {
                        ((InboundDataClient) it.next()).fail(e);
                    }
                    throw e;
                }
            }
        }
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public <T> CloseableFnDataReceiver<T> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, Coder<T> coder) {
        LOG.debug("Creating output consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getTransformId());
        return this.mainClient.send(apiServiceDescriptor, logicalEndpoint, coder);
    }

    public void reset() {
        synchronized (this.inboundDataClients) {
            this.inboundDataClients.clear();
            this.isDraining = false;
            this.finishedClients.clear();
        }
        this.queue = new ClosableQueue(this.queueSize);
    }
}
