package org.apache.beam.fn.harness.data;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.class */
public class BeamFnDataGrpcClient implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnDataGrpcClient.class);
    private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache = new ConcurrentHashMap();
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final PipelineOptions options;

    public BeamFnDataGrpcClient(PipelineOptions pipelineOptions, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> function, OutboundObserverFactory outboundObserverFactory) {
        this.options = pipelineOptions;
        this.channelFactory = function;
        this.outboundObserverFactory = outboundObserverFactory;
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public <T> InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, Coder<T> coder, FnDataReceiver<T> fnDataReceiver) {
        LOG.debug("Registering consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getPTransformId());
        BeamFnDataGrpcMultiplexer clientFor = getClientFor(apiServiceDescriptor);
        BeamFnDataInboundObserver forConsumer = BeamFnDataInboundObserver.forConsumer(coder, fnDataReceiver);
        clientFor.registerConsumer(logicalEndpoint, forConsumer);
        return forConsumer;
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public <T> CloseableFnDataReceiver<T> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, Coder<T> coder) {
        BeamFnDataGrpcMultiplexer clientFor = getClientFor(apiServiceDescriptor);
        LOG.debug("Creating output consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getPTransformId());
        Optional<Integer> bufferLimit = getBufferLimit(this.options);
        return bufferLimit.isPresent() ? BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(bufferLimit.get().intValue(), logicalEndpoint, coder, clientFor.getOutboundObserver()) : BeamFnDataBufferingOutboundObserver.forLocation(logicalEndpoint, coder, clientFor.getOutboundObserver());
    }

    private static Optional<Integer> getBufferLimit(PipelineOptions pipelineOptions) {
        List<String> experiments = ((ExperimentalOptions) pipelineOptions.as(ExperimentalOptions.class)).getExperiments();
        for (String str : experiments == null ? Collections.emptyList() : experiments) {
            if (str.startsWith("beam_fn_api_data_buffer_limit=")) {
                return Optional.of(Integer.valueOf(Integer.parseInt(str.substring("beam_fn_api_data_buffer_limit=".length()))));
            }
        }
        return Optional.empty();
    }

    private BeamFnDataGrpcMultiplexer getClientFor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return this.cache.computeIfAbsent(apiServiceDescriptor, apiServiceDescriptor2 -> {
            OutboundObserverFactory outboundObserverFactory = this.outboundObserverFactory;
            BeamFnDataGrpc.BeamFnDataStub newStub = BeamFnDataGrpc.newStub(this.channelFactory.apply(apiServiceDescriptor));
            Objects.requireNonNull(newStub);
            return new BeamFnDataGrpcMultiplexer(apiServiceDescriptor2, outboundObserverFactory, newStub::data);
        });
    }
}
