package org.apache.beam.fn.harness.data;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.class */
public class BeamFnDataBufferingOutboundObserver<T> implements CloseableThrowingConsumer<WindowedValue<T>> {
    private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
    private static final int DEFAULT_BUFFER_LIMIT_BYTES = 1000000;
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
    private long byteCounter;
    private long counter;
    private final int bufferLimit;
    private final Coder<WindowedValue<T>> coder;
    private final KV<String, BeamFnApi.Target> outputLocation;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ByteString.Output bufferedElements = ByteString.newOutput();

    public BeamFnDataBufferingOutboundObserver(PipelineOptions pipelineOptions, KV<String, BeamFnApi.Target> kv, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> streamObserver) {
        this.bufferLimit = getBufferLimit(pipelineOptions);
        this.outputLocation = kv;
        this.coder = coder;
        this.outboundObserver = streamObserver;
    }

    private static int getBufferLimit(PipelineOptions pipelineOptions) {
        List experiments = pipelineOptions.as(DataflowPipelineDebugOptions.class).getExperiments();
        for (String str : experiments == null ? Collections.emptyList() : experiments) {
            if (str.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
                return Integer.parseInt(str.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));
            }
        }
        return DEFAULT_BUFFER_LIMIT_BYTES;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        BeamFnApi.Elements.Builder convertBufferForTransmission = convertBufferForTransmission();
        convertBufferForTransmission.addDataBuilder().setInstructionReference((String) this.outputLocation.getKey()).setTarget((BeamFnApi.Target) this.outputLocation.getValue());
        LOG.debug("Closing stream for instruction {} and target {} having transmitted {} values {} bytes", new Object[]{this.outputLocation.getKey(), this.outputLocation.getValue(), Long.valueOf(this.counter), Long.valueOf(this.byteCounter)});
        this.outboundObserver.onNext(convertBufferForTransmission.build());
    }

    @Override // org.apache.beam.fn.harness.fn.ThrowingConsumer
    public void accept(WindowedValue<T> windowedValue) throws IOException {
        this.coder.encode(windowedValue, this.bufferedElements, Coder.Context.NESTED);
        this.counter++;
        if (this.bufferedElements.size() >= this.bufferLimit) {
            this.outboundObserver.onNext(convertBufferForTransmission().build());
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return newBuilder;
        }
        newBuilder.addDataBuilder().setInstructionReference((String) this.outputLocation.getKey()).setTarget((BeamFnApi.Target) this.outputLocation.getValue()).setData(this.bufferedElements.toByteString());
        this.byteCounter += this.bufferedElements.size();
        this.bufferedElements.reset();
        return newBuilder;
    }
}
