package org.apache.beam.fn.harness.data;

import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleExecutionState;
import org.apache.beam.runners.core.metrics.SimpleStateRegistry;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.class */
public class PCollectionConsumerRegistry {
    private MetricsContainerStepMap metricsContainerRegistry;
    private ExecutionStateTracker stateTracker;
    private SimpleStateRegistry executionStates = new SimpleStateRegistry();
    private ListMultimap<String, ConsumerAndMetadata> pCollectionIdsToConsumers = ArrayListMultimap.create();
    private Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$ConsumerAndMetadata.class */
    public static abstract class ConsumerAndMetadata {
        public static ConsumerAndMetadata forConsumer(FnDataReceiver fnDataReceiver, String str, SimpleExecutionState simpleExecutionState, Coder coder, MetricsContainer metricsContainer) {
            return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(fnDataReceiver, str, simpleExecutionState, coder, metricsContainer);
        }

        public abstract FnDataReceiver getConsumer();

        public abstract String getPTransformId();

        public abstract SimpleExecutionState getExecutionState();

        public abstract Coder getValueCoder();

        public abstract MetricsContainer getMetricsContainer();
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.class */
    private class MetricTrackingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> {
        private final FnDataReceiver<WindowedValue<T>> delegate;
        private final String pTransformId;
        private final SimpleExecutionState state;
        private final Counter unboundedElementCountCounter;
        private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;
        private final Coder<T> coder;
        private final MetricsContainer metricsContainer;

        public MetricTrackingFnDataReceiver(String str, ConsumerAndMetadata consumerAndMetadata) {
            this.delegate = consumerAndMetadata.getConsumer();
            this.state = consumerAndMetadata.getExecutionState();
            this.pTransformId = consumerAndMetadata.getPTransformId();
            HashMap hashMap = new HashMap();
            hashMap.put(MonitoringInfoConstants.Labels.PCOLLECTION, str);
            MetricsContainerImpl unboundContainer = PCollectionConsumerRegistry.this.metricsContainerRegistry.getUnboundContainer();
            this.unboundedElementCountCounter = unboundContainer.getCounter((MetricName) MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, hashMap));
            this.unboundedSampledByteSizeDistribution = new SampleByteSizeDistribution<>(unboundContainer.getDistribution((MetricName) MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, hashMap)));
            this.coder = consumerAndMetadata.getValueCoder();
            this.metricsContainer = consumerAndMetadata.getMetricsContainer();
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(WindowedValue<T> windowedValue) throws Exception {
            this.unboundedElementCountCounter.inc(windowedValue.getWindows().size());
            this.unboundedSampledByteSizeDistribution.tryUpdate(windowedValue.getValue(), this.coder);
            Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(this.metricsContainer);
            try {
                Closeable enterState = PCollectionConsumerRegistry.this.stateTracker.enterState(this.state);
                try {
                    this.delegate.accept(windowedValue);
                    if (enterState != null) {
                        $closeResource(null, enterState);
                    }
                    this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
                } catch (Throwable th) {
                    if (enterState != null) {
                        $closeResource(null, enterState);
                    }
                    throw th;
                }
            } finally {
                if (scopedMetricsContainer != null) {
                    $closeResource(null, scopedMetricsContainer);
                }
            }
        }

        private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
            if (th == null) {
                autoCloseable.close();
                return;
            }
            try {
                autoCloseable.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver.class */
    private class MultiplexingMetricTrackingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> {
        private final List<ConsumerAndMetadata> consumerAndMetadatas;
        private final Counter unboundedElementCountCounter;
        private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;

        public MultiplexingMetricTrackingFnDataReceiver(String str, List<ConsumerAndMetadata> list) {
            this.consumerAndMetadatas = list;
            HashMap hashMap = new HashMap();
            hashMap.put(MonitoringInfoConstants.Labels.PCOLLECTION, str);
            MetricsContainerImpl unboundContainer = PCollectionConsumerRegistry.this.metricsContainerRegistry.getUnboundContainer();
            this.unboundedElementCountCounter = unboundContainer.getCounter((MetricName) MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, hashMap));
            this.unboundedSampledByteSizeDistribution = new SampleByteSizeDistribution<>(unboundContainer.getDistribution((MetricName) MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, hashMap)));
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(WindowedValue<T> windowedValue) throws Exception {
            this.unboundedElementCountCounter.inc(windowedValue.getWindows().size());
            for (ConsumerAndMetadata consumerAndMetadata : this.consumerAndMetadatas) {
                if (consumerAndMetadata.getValueCoder() != null) {
                    this.unboundedSampledByteSizeDistribution.tryUpdate(windowedValue.getValue(), consumerAndMetadata.getValueCoder());
                }
                Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(consumerAndMetadata.getMetricsContainer());
                try {
                    Closeable enterState = PCollectionConsumerRegistry.this.stateTracker.enterState(consumerAndMetadata.getExecutionState());
                    Throwable th = null;
                    try {
                        try {
                            consumerAndMetadata.getConsumer().accept(windowedValue);
                            if (enterState != null) {
                                $closeResource(null, enterState);
                            }
                            this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (scopedMetricsContainer != null) {
                        $closeResource(null, scopedMetricsContainer);
                    }
                }
            }
        }

        private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
            if (th == null) {
                autoCloseable.close();
                return;
            }
            try {
                autoCloseable.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SampleByteSizeDistribution.class */
    public static class SampleByteSizeDistribution<T> {
        final Distribution distribution;
        private static final int RESERVOIR_SIZE = 10;
        private static final int SAMPLING_THRESHOLD = 30;
        private long samplingToken = 0;
        private long nextSamplingToken = 0;
        private Random randomGenerator = new Random();
        ByteSizeObserver byteCountObserver = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SampleByteSizeDistribution$ByteSizeObserver.class */
        public static class ByteSizeObserver extends ElementByteSizeObserver {
            private long observedSize;

            private ByteSizeObserver() {
                this.observedSize = 0L;
            }

            @Override // org.apache.beam.sdk.util.common.ElementByteSizeObserver
            protected void reportElementSize(long j) {
                this.observedSize += j;
            }
        }

        public SampleByteSizeDistribution(Distribution distribution) {
            this.distribution = distribution;
        }

        public void tryUpdate(T t, Coder<T> coder) throws Exception {
            if (!shouldSampleElement()) {
                this.byteCountObserver = null;
                return;
            }
            this.byteCountObserver = new ByteSizeObserver();
            coder.registerByteSizeObserver(t, this.byteCountObserver);
            if (this.byteCountObserver.getIsLazy()) {
                return;
            }
            this.byteCountObserver.advance();
            this.distribution.update(this.byteCountObserver.observedSize);
        }

        public void finishLazyUpdate() {
            if (this.byteCountObserver == null || !this.byteCountObserver.getIsLazy()) {
                return;
            }
            this.byteCountObserver.advance();
            this.distribution.update(this.byteCountObserver.observedSize);
        }

        private boolean shouldSampleElement() {
            if (this.samplingToken + 1 == Long.MAX_VALUE) {
                this.samplingToken = 0L;
                this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
            }
            this.samplingToken++;
            if (this.nextSamplingToken != 0) {
                if (this.samplingToken < this.nextSamplingToken) {
                    return false;
                }
                this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
                return true;
            }
            if (this.samplingToken > 10 && this.randomGenerator.nextInt((int) this.samplingToken) >= 10) {
                return false;
            }
            if (this.samplingToken <= 30) {
                return true;
            }
            this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
            return true;
        }

        private long getNextSamplingToken(long j) {
            return j + ((int) (Math.log(1.0d - this.randomGenerator.nextDouble()) / Math.log(1.0d - (10.0d / j))));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SplittingMetricTrackingFnDataReceiver.class */
    private class SplittingMetricTrackingFnDataReceiver<T> extends MetricTrackingFnDataReceiver<T> implements HandlesSplits {
        private final HandlesSplits delegate;

        public SplittingMetricTrackingFnDataReceiver(String str, ConsumerAndMetadata consumerAndMetadata) {
            super(str, consumerAndMetadata);
            this.delegate = (HandlesSplits) consumerAndMetadata.getConsumer();
        }

        @Override // org.apache.beam.fn.harness.HandlesSplits
        public HandlesSplits.SplitResult trySplit(double d) {
            return this.delegate.trySplit(d);
        }

        @Override // org.apache.beam.fn.harness.HandlesSplits
        public double getProgress() {
            return this.delegate.getProgress();
        }
    }

    public PCollectionConsumerRegistry(MetricsContainerStepMap metricsContainerStepMap, ExecutionStateTracker executionStateTracker) {
        this.metricsContainerRegistry = metricsContainerStepMap;
        this.stateTracker = executionStateTracker;
    }

    public <T> void register(String str, String str2, FnDataReceiver<WindowedValue<T>> fnDataReceiver, Coder<T> coder) {
        if (this.pCollectionIdsToWrappedConsumer.containsKey(str)) {
            throw new RuntimeException("New consumers for a pCollectionId cannot be register()-d after calling getMultiplexingConsumer.");
        }
        HashMap hashMap = new HashMap();
        hashMap.put(MonitoringInfoConstants.Labels.PTRANSFORM, str2);
        SimpleExecutionState simpleExecutionState = new SimpleExecutionState(ExecutionStateTracker.PROCESS_STATE_NAME, MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS, hashMap);
        this.executionStates.register(simpleExecutionState);
        this.pCollectionIdsToConsumers.put(str, ConsumerAndMetadata.forConsumer(fnDataReceiver, str2, simpleExecutionState, coder, this.metricsContainerRegistry.getContainer(str2)));
    }

    public void reset() {
        this.executionStates.reset();
    }

    public Set<String> keySet() {
        return this.pCollectionIdsToConsumers.keySet();
    }

    public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String str) {
        return this.pCollectionIdsToWrappedConsumer.computeIfAbsent(str, str2 -> {
            List<ConsumerAndMetadata> list = this.pCollectionIdsToConsumers.get((ListMultimap<String, ConsumerAndMetadata>) str2);
            if (list == null) {
                throw new IllegalArgumentException(String.format("Unknown PCollectionId %s", str));
            }
            if (list.size() != 1) {
                return new MultiplexingMetricTrackingFnDataReceiver(str2, ImmutableList.copyOf((Collection) list));
            }
            ConsumerAndMetadata consumerAndMetadata = list.get(0);
            return consumerAndMetadata.getConsumer() instanceof HandlesSplits ? new SplittingMetricTrackingFnDataReceiver(str2, consumerAndMetadata) : new MetricTrackingFnDataReceiver(str2, consumerAndMetadata);
        });
    }

    public List<MetricsApi.MonitoringInfo> getExecutionTimeMonitoringInfos() {
        return this.executionStates.getExecutionTimeMonitoringInfos();
    }

    public Map<String, ByteString> getExecutionTimeMonitoringData(ShortIdMap shortIdMap) {
        return this.executionStates.getExecutionTimeMonitoringData(shortIdMap);
    }

    @VisibleForTesting
    public List<FnDataReceiver> getUnderlyingConsumers(String str) {
        return Lists.transform(this.pCollectionIdsToConsumers.get((ListMultimap<String, ConsumerAndMetadata>) str), consumerAndMetadata -> {
            return consumerAndMetadata.getConsumer();
        });
    }
}
