package org.apache.beam.fn.harness.stream;

import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;

/* loaded from: input_file:org/apache/beam/fn/harness/stream/StreamObserverFactory.class */
public abstract class StreamObserverFactory {

    /* loaded from: input_file:org/apache/beam/fn/harness/stream/StreamObserverFactory$Buffered.class */
    private static class Buffered extends StreamObserverFactory {
        private static final int DEFAULT_BUFFER_SIZE = 64;
        private final ExecutorService executorService;
        private final int bufferSize;

        private Buffered(ExecutorService executorService, int i) {
            this.executorService = executorService;
            this.bufferSize = i;
        }

        @Override // org.apache.beam.fn.harness.stream.StreamObserverFactory
        public <ReqT, RespT> StreamObserver<RespT> from(Function<StreamObserver<ReqT>, StreamObserver<RespT>> function, StreamObserver<ReqT> streamObserver) {
            AdvancingPhaser advancingPhaser = new AdvancingPhaser(1);
            advancingPhaser.getClass();
            return new BufferingStreamObserver(advancingPhaser, function.apply(new ForwardingClientResponseObserver(streamObserver, advancingPhaser::arrive)), this.executorService, this.bufferSize);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/stream/StreamObserverFactory$Direct.class */
    private static class Direct extends StreamObserverFactory {
        private Direct() {
        }

        @Override // org.apache.beam.fn.harness.stream.StreamObserverFactory
        public <ReqT, RespT> StreamObserver<RespT> from(Function<StreamObserver<ReqT>, StreamObserver<RespT>> function, StreamObserver<ReqT> streamObserver) {
            AdvancingPhaser advancingPhaser = new AdvancingPhaser(1);
            advancingPhaser.getClass();
            return new DirectStreamObserver(advancingPhaser, function.apply(new ForwardingClientResponseObserver(streamObserver, advancingPhaser::arrive)));
        }
    }

    public static StreamObserverFactory fromOptions(PipelineOptions pipelineOptions) {
        List<String> experiments = pipelineOptions.as(DataflowPipelineDebugOptions.class).getExperiments();
        if (experiments == null || !experiments.contains("beam_fn_api_buffered_stream")) {
            return new Direct();
        }
        int i = 64;
        for (String str : experiments) {
            if (str.startsWith("beam_fn_api_buffered_stream_buffer_size=")) {
                i = Integer.parseInt(str.substring("beam_fn_api_buffered_stream_buffer_size=".length()));
            }
        }
        return new Buffered(pipelineOptions.as(GcsOptions.class).getExecutorService(), i);
    }

    public abstract <ReqT, RespT> StreamObserver<RespT> from(Function<StreamObserver<ReqT>, StreamObserver<RespT>> function, StreamObserver<ReqT> streamObserver);
}
