package org.apache.beam.fn.harness.control;

import java.util.EnumMap;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/control/BeamFnControlClient.class */
public class BeamFnControlClient {
    private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
    private final BlockingDeque<BeamFnApi.InstructionRequest> bufferedInstructions = new LinkedBlockingDeque();
    private final EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers;
    private final CompletableFuture<Object> onFinish;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnControlClient.class);
    private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
    private static final BeamFnApi.InstructionRequest POISON_PILL = BeamFnApi.InstructionRequest.newBuilder().setInstructionId(FAKE_INSTRUCTION_ID).build();
    private static final Object COMPLETED = new Object();

    /* loaded from: input_file:org/apache/beam/fn/harness/control/BeamFnControlClient$InboundObserver.class */
    private class InboundObserver implements StreamObserver<BeamFnApi.InstructionRequest> {
        private InboundObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.InstructionRequest instructionRequest) {
            BeamFnControlClient.LOG.debug("Received InstructionRequest {}", instructionRequest);
            Uninterruptibles.putUninterruptibly(BeamFnControlClient.this.bufferedInstructions, instructionRequest);
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            placePoisonPillIntoQueue();
            BeamFnControlClient.this.onFinish.completeExceptionally(th);
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onCompleted() {
            placePoisonPillIntoQueue();
            BeamFnControlClient.this.onFinish.complete(BeamFnControlClient.COMPLETED);
        }

        private void placePoisonPillIntoQueue() {
            while (true) {
                try {
                    BeamFnControlClient.this.bufferedInstructions.putFirst(BeamFnControlClient.POISON_PILL);
                    return;
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public BeamFnControlClient(String str, Endpoints.ApiServiceDescriptor apiServiceDescriptor, ManagedChannelFactory managedChannelFactory, OutboundObserverFactory outboundObserverFactory, EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> enumMap) {
        BeamFnControlGrpc.BeamFnControlStub newStub = BeamFnControlGrpc.newStub(managedChannelFactory.forDescriptor(apiServiceDescriptor));
        Objects.requireNonNull(newStub);
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(newStub::control, new InboundObserver());
        this.handlers = enumMap;
        this.onFinish = new CompletableFuture<>();
    }

    public void processInstructionRequests(Executor executor) throws InterruptedException, ExecutionException {
        while (true) {
            BeamFnApi.InstructionRequest take = this.bufferedInstructions.take();
            if (Objects.equals(take, POISON_PILL)) {
                this.onFinish.get();
                return;
            }
            executor.execute(() -> {
                try {
                    sendInstructionResponse(delegateOnInstructionRequestType(take));
                } catch (Error e) {
                    sendErrorResponse(e);
                    throw e;
                }
            });
        }
    }

    public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(BeamFnApi.InstructionRequest instructionRequest) {
        try {
            return ((BeamFnApi.InstructionResponse.Builder) ((ThrowingFunction) this.handlers.getOrDefault(instructionRequest.getRequestCase(), this::missingHandler)).apply(instructionRequest)).setInstructionId(instructionRequest.getInstructionId()).build();
        } catch (Error e) {
            LOG.error("Error thrown when handling {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), instructionRequest.getInstructionId(), e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Exception while trying to handle {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), instructionRequest.getInstructionId(), e2);
            return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(instructionRequest.getInstructionId()).setError(Throwables.getStackTraceAsString(e2)).build();
        }
    }

    public void sendInstructionResponse(BeamFnApi.InstructionResponse instructionResponse) {
        LOG.debug("Sending InstructionResponse {}", instructionResponse);
        this.outboundObserver.onNext(instructionResponse);
    }

    private void sendErrorResponse(Error error) {
        this.onFinish.completeExceptionally(error);
        this.outboundObserver.onError(Status.INTERNAL.withDescription(String.format("%s: %s", error.getClass().getName(), error.getMessage())).asException());
    }

    private BeamFnApi.InstructionResponse.Builder missingHandler(BeamFnApi.InstructionRequest instructionRequest) {
        return BeamFnApi.InstructionResponse.newBuilder().setError(String.format("Unknown InstructionRequest type %s", instructionRequest.getRequestCase()));
    }
}
