package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@NotThreadSafe
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.class */
public class BeamFnDataOutboundAggregator {
    public static final String DATA_BUFFER_SIZE_LIMIT = "data_buffer_size_limit=";
    public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1000000;
    public static final String DATA_BUFFER_TIME_LIMIT_MS = "data_buffer_time_limit_ms=";
    public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1;
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
    private final int sizeLimit;
    private final long timeLimit;
    private final Supplier<String> processBundleRequestIdSupplier;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;

    @Nullable
    @VisibleForTesting
    ScheduledFuture<?> flushFuture;
    private final boolean collectElementsIfNoFlushes;

    @VisibleForTesting
    final Map<String, Receiver<?>> outputDataReceivers = new HashMap();

    @VisibleForTesting
    final Map<TimerEndpoint, Receiver<?>> outputTimersReceivers = new HashMap();
    private long bytesWrittenSinceFlush = 0;
    private final Object flushLock = new Object();
    private boolean hasFlushedForBundle = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator$Receiver.class */
    public class Receiver<T> implements FnDataReceiver<T> {
        private final Coder<T> coder;
        private final ByteStringOutputStream output = new ByteStringOutputStream();
        private long perBundleByteCount = 0;
        private long perBundleElementCount = 0;

        public Receiver(Coder<T> coder) {
            this.coder = coder;
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$214(org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(T r7) throws java.lang.Exception {
            /*
                r6 = this;
                r0 = r6
                org.apache.beam.sdk.util.ByteStringOutputStream r0 = r0.output
                int r0 = r0.size()
                r8 = r0
                r0 = r6
                org.apache.beam.sdk.coders.Coder<T> r0 = r0.coder
                r1 = r7
                r2 = r6
                org.apache.beam.sdk.util.ByteStringOutputStream r2 = r2.output
                r0.encode(r1, r2)
                r0 = r6
                org.apache.beam.sdk.util.ByteStringOutputStream r0 = r0.output
                int r0 = r0.size()
                r1 = r8
                int r0 = r0 - r1
                if (r0 != 0) goto L28
                r0 = r6
                org.apache.beam.sdk.util.ByteStringOutputStream r0 = r0.output
                r1 = 0
                r0.write(r1)
            L28:
                r0 = r6
                org.apache.beam.sdk.util.ByteStringOutputStream r0 = r0.output
                int r0 = r0.size()
                long r0 = (long) r0
                r1 = r8
                long r1 = (long) r1
                long r0 = r0 - r1
                r9 = r0
                r0 = r6
                org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator r0 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.this
                r1 = r9
                long r0 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$214(r0, r1)
                r0 = r6
                r1 = r0
                long r1 = r1.perBundleByteCount
                r2 = r9
                long r1 = r1 + r2
                r0.perBundleByteCount = r1
                r0 = r6
                r1 = r0
                long r1 = r1.perBundleElementCount
                r2 = 1
                long r1 = r1 + r2
                r0.perBundleElementCount = r1
                r0 = r6
                org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator r0 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.this
                long r0 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$200(r0)
                r1 = r6
                org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator r1 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.this
                int r1 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$300(r1)
                long r1 = (long) r1
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 <= 0) goto L6b
                r0 = r6
                org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator r0 = org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.this
                org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$400(r0)
            L6b:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.Receiver.accept(java.lang.Object):void");
        }

        public long getByteCount() {
            return this.perBundleByteCount;
        }

        public long getElementCount() {
            return this.perBundleElementCount;
        }

        public int bufferedSize() {
            return this.output.size();
        }

        public ByteString toByteStringAndResetBuffer() {
            return this.output.toByteStringAndReset();
        }

        public void resetStats() {
            this.perBundleElementCount = 0L;
            this.perBundleByteCount = 0L;
        }

        @SideEffectFree
        public String toString() {
            return String.format("Byte size: %s, Element count: %s", Long.valueOf(this.perBundleByteCount), Long.valueOf(this.perBundleElementCount));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator$TimerEndpoint.class */
    public static class TimerEndpoint {
        private final String pTransformId;
        private final String timerFamilyId;

        public TimerEndpoint(String str, String str2) {
            this.pTransformId = str;
            this.timerFamilyId = str2;
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof TimerEndpoint)) {
                return false;
            }
            TimerEndpoint timerEndpoint = (TimerEndpoint) obj;
            return this.pTransformId.equals(timerEndpoint.pTransformId) && this.timerFamilyId.equals(timerEndpoint.timerFamilyId);
        }

        @Pure
        public int hashCode() {
            return Objects.hash(this.pTransformId, this.timerFamilyId);
        }

        @SideEffectFree
        public String toString() {
            return "pTransformId: " + this.pTransformId + " timerFamilyId: " + this.timerFamilyId;
        }
    }

    public BeamFnDataOutboundAggregator(PipelineOptions pipelineOptions, Supplier<String> supplier, StreamObserver<BeamFnApi.Elements> streamObserver, boolean z) {
        this.sizeLimit = getSizeLimit(pipelineOptions);
        this.timeLimit = getTimeLimit(pipelineOptions);
        this.collectElementsIfNoFlushes = z;
        this.outboundObserver = streamObserver;
        this.processBundleRequestIdSupplier = supplier;
    }

    public void start() {
        if (this.timeLimit <= 0 || this.flushFuture != null) {
            return;
        }
        this.flushFuture = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DataBufferOutboundFlusher-thread").build()).scheduleAtFixedRate(this::flush, this.timeLimit, this.timeLimit, TimeUnit.MILLISECONDS);
    }

    public <T> FnDataReceiver<T> registerOutputDataLocation(String str, Coder<T> coder) {
        if (this.outputDataReceivers.containsKey(str)) {
            throw new IllegalStateException("Outbound data endpoint already registered for " + str);
        }
        Receiver<?> receiver = new Receiver<>(coder);
        if (this.timeLimit > 0) {
            this.outputDataReceivers.put(str, receiver);
            return obj -> {
                checkFlushThreadException();
                synchronized (this.flushLock) {
                    receiver.accept(obj);
                }
            };
        }
        this.outputDataReceivers.put(str, receiver);
        return receiver;
    }

    public <T> FnDataReceiver<T> registerOutputTimersLocation(String str, String str2, Coder<T> coder) {
        TimerEndpoint timerEndpoint = new TimerEndpoint(str, str2);
        if (this.outputTimersReceivers.containsKey(timerEndpoint)) {
            throw new IllegalStateException("Outbound timers endpoint already registered for " + timerEndpoint);
        }
        Receiver<?> receiver = new Receiver<>(coder);
        if (this.timeLimit > 0) {
            this.outputTimersReceivers.put(timerEndpoint, receiver);
            return obj -> {
                checkFlushThreadException();
                synchronized (this.flushLock) {
                    receiver.accept(obj);
                }
            };
        }
        this.outputTimersReceivers.put(timerEndpoint, receiver);
        return receiver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushInternal() {
        if (this.bytesWrittenSinceFlush == 0) {
            return;
        }
        BeamFnApi.Elements.Builder convertBufferForTransmission = convertBufferForTransmission();
        if (convertBufferForTransmission.getDataCount() > 0 || convertBufferForTransmission.getTimersCount() > 0) {
            this.outboundObserver.onNext(convertBufferForTransmission.m697build());
        }
        this.hasFlushedForBundle = true;
    }

    public BeamFnApi.Elements sendOrCollectBufferedDataAndFinishOutboundStreams() {
        BeamFnApi.Elements.Builder convertBufferForTransmission;
        if (this.outputTimersReceivers.isEmpty() && this.outputDataReceivers.isEmpty()) {
            return null;
        }
        if (this.timeLimit > 0) {
            synchronized (this.flushLock) {
                convertBufferForTransmission = convertBufferForTransmission();
            }
        } else {
            convertBufferForTransmission = convertBufferForTransmission();
        }
        LOG.debug("Closing streams for instruction {} and outbound data {} and timers {}.", new Object[]{this.processBundleRequestIdSupplier.get(), this.outputDataReceivers, this.outputTimersReceivers});
        for (Map.Entry<String, Receiver<?>> entry : this.outputDataReceivers.entrySet()) {
            convertBufferForTransmission.addDataBuilder().setInstructionId(this.processBundleRequestIdSupplier.get()).setTransformId(entry.getKey()).setIsLast(true);
            entry.getValue().resetStats();
        }
        for (Map.Entry<TimerEndpoint, Receiver<?>> entry2 : this.outputTimersReceivers.entrySet()) {
            TimerEndpoint key = entry2.getKey();
            convertBufferForTransmission.addTimersBuilder().setInstructionId(this.processBundleRequestIdSupplier.get()).setTransformId(key.pTransformId).setTimerFamilyId(key.timerFamilyId).setIsLast(true);
            entry2.getValue().resetStats();
        }
        if (this.collectElementsIfNoFlushes && !this.hasFlushedForBundle) {
            return convertBufferForTransmission.m697build();
        }
        this.outboundObserver.onNext(convertBufferForTransmission.m697build());
        this.hasFlushedForBundle = false;
        return null;
    }

    public void sendElements(BeamFnApi.Elements elements) {
        this.outboundObserver.onNext(elements);
    }

    public void discard() {
        if (this.flushFuture != null) {
            this.flushFuture.cancel(true);
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
        for (Map.Entry<String, Receiver<?>> entry : this.outputDataReceivers.entrySet()) {
            if (entry.getValue().bufferedSize() != 0) {
                newBuilder.addDataBuilder().setInstructionId(this.processBundleRequestIdSupplier.get()).setTransformId(entry.getKey()).setData(entry.getValue().toByteStringAndResetBuffer());
            }
        }
        for (Map.Entry<TimerEndpoint, Receiver<?>> entry2 : this.outputTimersReceivers.entrySet()) {
            if (entry2.getValue().bufferedSize() != 0) {
                newBuilder.addTimersBuilder().setInstructionId(this.processBundleRequestIdSupplier.get()).setTransformId(entry2.getKey().pTransformId).setTimerFamilyId(entry2.getKey().timerFamilyId).setTimers(entry2.getValue().toByteStringAndResetBuffer());
            }
        }
        this.bytesWrittenSinceFlush = 0L;
        return newBuilder;
    }

    void flush() {
        try {
            synchronized (this.flushLock) {
                flushInternal();
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private void checkFlushThreadException() throws IOException {
        if (this.timeLimit <= 0 || !this.flushFuture.isDone()) {
            return;
        }
        try {
            this.flushFuture.get();
            throw new IOException("Periodic flushing thread finished unexpectedly.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        } catch (CancellationException e2) {
            throw new IOException(e2);
        } catch (ExecutionException e3) {
            unwrapExecutionException(e3);
        }
    }

    private void unwrapExecutionException(ExecutionException executionException) throws IOException {
        RuntimeException runtimeException = (RuntimeException) executionException.getCause();
        if (!(runtimeException.getCause() instanceof IOException)) {
            throw new IOException(runtimeException.getCause());
        }
        throw ((IOException) runtimeException.getCause());
    }

    private static int getSizeLimit(PipelineOptions pipelineOptions) {
        List experiments = pipelineOptions.as(ExperimentalOptions.class).getExperiments();
        for (String str : experiments == null ? Collections.emptyList() : experiments) {
            if (str.startsWith(DATA_BUFFER_SIZE_LIMIT)) {
                return Integer.parseInt(str.substring(DATA_BUFFER_SIZE_LIMIT.length()));
            }
        }
        return 1000000;
    }

    private static long getTimeLimit(PipelineOptions pipelineOptions) {
        List experiments = pipelineOptions.as(ExperimentalOptions.class).getExperiments();
        for (String str : experiments == null ? Collections.emptyList() : experiments) {
            if (str.startsWith(DATA_BUFFER_TIME_LIMIT_MS)) {
                return Long.parseLong(str.substring(DATA_BUFFER_TIME_LIMIT_MS.length()));
            }
        }
        return -1L;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$214(org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$214(org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator r6, long r7) {
        /*
            r0 = r6
            r1 = r0
            long r1 = r1.bytesWrittenSinceFlush
            r2 = r7
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.bytesWrittenSinceFlush = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator.access$214(org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator, long):long");
    }

    static /* synthetic */ long access$200(BeamFnDataOutboundAggregator beamFnDataOutboundAggregator) {
        return beamFnDataOutboundAggregator.bytesWrittenSinceFlush;
    }

    static /* synthetic */ int access$300(BeamFnDataOutboundAggregator beamFnDataOutboundAggregator) {
        return beamFnDataOutboundAggregator.sizeLimit;
    }

    static /* synthetic */ void access$400(BeamFnDataOutboundAggregator beamFnDataOutboundAggregator) {
        beamFnDataOutboundAggregator.flushInternal();
    }

    static {
    }
}
