package org.apache.beam.repackaged.direct_java.sdk.fn.data;

import java.io.IOException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/sdk/fn/data/BeamFnDataSizeBasedBufferingOutboundObserver.class */
public class BeamFnDataSizeBasedBufferingOutboundObserver<T> implements BeamFnDataBufferingOutboundObserver<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataSizeBasedBufferingOutboundObserver.class);
    private long byteCounter;
    private long counter;
    private final int sizeLimit;
    private final Coder<T> coder;
    private final LogicalEndpoint outputLocation;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ByteString.Output bufferedElements = ByteString.newOutput();
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BeamFnDataSizeBasedBufferingOutboundObserver(int i, LogicalEndpoint logicalEndpoint, Coder<T> coder, StreamObserver<BeamFnApi.Elements> streamObserver) {
        this.sizeLimit = i;
        this.outputLocation = logicalEndpoint;
        this.coder = coder;
        this.outboundObserver = streamObserver;
    }

    @Override // org.apache.beam.repackaged.direct_java.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        BeamFnApi.Elements.Builder convertBufferForTransmission = convertBufferForTransmission();
        if (this.outputLocation.isTimer()) {
            convertBufferForTransmission.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setIsLast(true);
        } else {
            convertBufferForTransmission.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setIsLast(true);
        }
        LOG.debug("Closing stream for instruction {} and transform {} having transmitted {} values {} bytes", new Object[]{this.outputLocation.getInstructionId(), this.outputLocation.getTransformId(), Long.valueOf(this.counter), Long.valueOf(this.byteCounter)});
        this.outboundObserver.onNext(convertBufferForTransmission.build());
    }

    @Override // org.apache.beam.repackaged.direct_java.sdk.fn.data.CloseableFnDataReceiver
    public void flush() throws IOException {
        if (this.bufferedElements.size() > 0) {
            this.outboundObserver.onNext(convertBufferForTransmission().build());
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver
    public void accept(T t) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.coder.encode(t, this.bufferedElements);
        this.counter++;
        if (this.bufferedElements.size() >= this.sizeLimit) {
            flush();
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return newBuilder;
        }
        if (this.outputLocation.isTimer()) {
            newBuilder.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setTimers(this.bufferedElements.toByteString());
        } else {
            newBuilder.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setData(this.bufferedElements.toByteString());
        }
        this.byteCounter += this.bufferedElements.size();
        this.bufferedElements.reset();
        return newBuilder;
    }
}
