package org.apache.beam.fn.harness.data;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.Metrics;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.class */
public class PCollectionConsumerRegistry {
    private final MetricsContainerStepMap metricsContainerRegistry;
    private final MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState;
    private final ExecutionStateSampler.ExecutionStateTracker stateTracker;
    private final ShortIdMap shortIdMap;
    private final Map<String, List<ConsumerAndMetadata>> pCollectionIdsToConsumers = new HashMap();
    private final Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer = new HashMap();
    private final BundleProgressReporter.Registrar bundleProgressReporterRegistrar;
    private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
    private final RehydratedComponents rehydratedComponents;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$ConsumerAndMetadata.class */
    public static abstract class ConsumerAndMetadata {
        public static ConsumerAndMetadata forConsumer(FnDataReceiver fnDataReceiver, String str, ExecutionStateSampler.ExecutionState executionState, MetricsContainer metricsContainer) {
            return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(fnDataReceiver, str, executionState, metricsContainer);
        }

        public abstract FnDataReceiver getConsumer();

        public abstract String getPTransformId();

        public abstract ExecutionStateSampler.ExecutionState getExecutionState();

        public abstract MetricsContainer getMetricsContainer();
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.class */
    private class MetricTrackingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> {
        private final FnDataReceiver<WindowedValue<T>> delegate;
        private final ExecutionStateSampler.ExecutionState executionState;
        private final Metrics.BundleCounter elementCountCounter;
        private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
        private final Coder<T> coder;
        private final MetricsContainer metricsContainer;
        private final MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState;

        public MetricTrackingFnDataReceiver(String str, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata, MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState) {
            this.delegate = consumerAndMetadata.getConsumer();
            this.executionState = consumerAndMetadata.getExecutionState();
            HashMap hashMap = new HashMap();
            hashMap.put(MonitoringInfoConstants.Labels.PCOLLECTION, str);
            this.elementCountCounter = Metrics.bundleProcessingThreadCounter(PCollectionConsumerRegistry.this.shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT).setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE).setLabels(hashMap).build()), MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, hashMap));
            PCollectionConsumerRegistry.this.bundleProgressReporterRegistrar.register(this.elementCountCounter);
            Metrics.BundleDistribution bundleProcessingThreadDistribution = Metrics.bundleProcessingThreadDistribution(PCollectionConsumerRegistry.this.shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE).setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE).setLabels(hashMap).build()), MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, hashMap));
            this.sampledByteSizeDistribution = new SampleByteSizeDistribution<>(bundleProcessingThreadDistribution);
            PCollectionConsumerRegistry.this.bundleProgressReporterRegistrar.register(bundleProcessingThreadDistribution);
            this.coder = coder;
            this.metricsContainer = consumerAndMetadata.getMetricsContainer();
            this.metricsEnvironmentState = metricsEnvironmentState;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(WindowedValue<T> windowedValue) throws Exception {
            this.elementCountCounter.inc(windowedValue.getWindows().size());
            this.sampledByteSizeDistribution.tryUpdate(windowedValue.getValue(), this.coder);
            MetricsContainer activate = this.metricsEnvironmentState.activate(this.metricsContainer);
            this.executionState.activate();
            try {
                this.delegate.accept(windowedValue);
                this.sampledByteSizeDistribution.finishLazyUpdate();
            } finally {
                this.executionState.deactivate();
                this.metricsEnvironmentState.activate(activate);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver.class */
    private class MultiplexingMetricTrackingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> {
        private final List<ConsumerAndMetadata> consumerAndMetadatas;
        private final MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState;
        private final Metrics.BundleCounter elementCountCounter;
        private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
        private final Coder<T> coder;

        public MultiplexingMetricTrackingFnDataReceiver(String str, Coder<T> coder, List<ConsumerAndMetadata> list, MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState) {
            this.consumerAndMetadatas = list;
            this.metricsEnvironmentState = metricsEnvironmentState;
            HashMap hashMap = new HashMap();
            hashMap.put(MonitoringInfoConstants.Labels.PCOLLECTION, str);
            this.elementCountCounter = Metrics.bundleProcessingThreadCounter(PCollectionConsumerRegistry.this.shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT).setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE).setLabels(hashMap).build()), MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, hashMap));
            PCollectionConsumerRegistry.this.bundleProgressReporterRegistrar.register(this.elementCountCounter);
            Metrics.BundleDistribution bundleProcessingThreadDistribution = Metrics.bundleProcessingThreadDistribution(PCollectionConsumerRegistry.this.shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE).setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE).setLabels(hashMap).build()), MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, hashMap));
            this.sampledByteSizeDistribution = new SampleByteSizeDistribution<>(bundleProcessingThreadDistribution);
            PCollectionConsumerRegistry.this.bundleProgressReporterRegistrar.register(bundleProcessingThreadDistribution);
            this.coder = coder;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(WindowedValue<T> windowedValue) throws Exception {
            this.elementCountCounter.inc(windowedValue.getWindows().size());
            this.sampledByteSizeDistribution.tryUpdate(windowedValue.getValue(), this.coder);
            for (ConsumerAndMetadata consumerAndMetadata : this.consumerAndMetadatas) {
                MetricsContainer activate = this.metricsEnvironmentState.activate(consumerAndMetadata.getMetricsContainer());
                ExecutionStateSampler.ExecutionState executionState = consumerAndMetadata.getExecutionState();
                executionState.activate();
                try {
                    consumerAndMetadata.getConsumer().accept(windowedValue);
                    executionState.deactivate();
                    this.metricsEnvironmentState.activate(activate);
                    this.sampledByteSizeDistribution.finishLazyUpdate();
                } catch (Throwable th) {
                    executionState.deactivate();
                    this.metricsEnvironmentState.activate(activate);
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SampleByteSizeDistribution.class */
    public static class SampleByteSizeDistribution<T> {
        final Distribution distribution;
        private static final int RESERVOIR_SIZE = 10;
        private static final int SAMPLING_THRESHOLD = 30;
        private long samplingToken = 0;
        private long nextSamplingToken = 0;
        private Random randomGenerator = new Random();
        ByteSizeObserver byteCountObserver = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SampleByteSizeDistribution$ByteSizeObserver.class */
        public static class ByteSizeObserver extends ElementByteSizeObserver {
            private long observedSize;

            private ByteSizeObserver() {
                this.observedSize = 0L;
            }

            @Override // org.apache.beam.sdk.util.common.ElementByteSizeObserver
            protected void reportElementSize(long j) {
                this.observedSize += j;
            }
        }

        public SampleByteSizeDistribution(Distribution distribution) {
            this.distribution = distribution;
        }

        public void tryUpdate(T t, Coder<T> coder) throws Exception {
            if (!shouldSampleElement()) {
                this.byteCountObserver = null;
                return;
            }
            this.byteCountObserver = new ByteSizeObserver();
            coder.registerByteSizeObserver(t, this.byteCountObserver);
            if (this.byteCountObserver.getIsLazy()) {
                return;
            }
            this.byteCountObserver.advance();
            this.distribution.update(this.byteCountObserver.observedSize);
        }

        public void finishLazyUpdate() {
            if (this.byteCountObserver == null || !this.byteCountObserver.getIsLazy()) {
                return;
            }
            this.byteCountObserver.advance();
            this.distribution.update(this.byteCountObserver.observedSize);
        }

        private boolean shouldSampleElement() {
            if (this.samplingToken + 1 == Long.MAX_VALUE) {
                this.samplingToken = 0L;
                this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
            }
            this.samplingToken++;
            if (this.nextSamplingToken != 0) {
                if (this.samplingToken < this.nextSamplingToken) {
                    return false;
                }
                this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
                return true;
            }
            if (this.samplingToken > 10 && this.randomGenerator.nextInt((int) this.samplingToken) >= 10) {
                return false;
            }
            if (this.samplingToken <= 30) {
                return true;
            }
            this.nextSamplingToken = getNextSamplingToken(this.samplingToken);
            return true;
        }

        private long getNextSamplingToken(long j) {
            return j + ((int) (Math.log(1.0d - this.randomGenerator.nextDouble()) / Math.log(1.0d - (10.0d / j))));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistry$SplittingMetricTrackingFnDataReceiver.class */
    private class SplittingMetricTrackingFnDataReceiver<T> extends MetricTrackingFnDataReceiver<T> implements HandlesSplits {
        private final HandlesSplits delegate;

        public SplittingMetricTrackingFnDataReceiver(String str, Coder<T> coder, ConsumerAndMetadata consumerAndMetadata, MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState) {
            super(str, coder, consumerAndMetadata, metricsEnvironmentState);
            this.delegate = (HandlesSplits) consumerAndMetadata.getConsumer();
        }

        @Override // org.apache.beam.fn.harness.HandlesSplits
        public HandlesSplits.SplitResult trySplit(double d) {
            return this.delegate.trySplit(d);
        }

        @Override // org.apache.beam.fn.harness.HandlesSplits
        public double getProgress() {
            return this.delegate.getProgress();
        }
    }

    public PCollectionConsumerRegistry(MetricsContainerStepMap metricsContainerStepMap, MetricsEnvironment.MetricsEnvironmentState metricsEnvironmentState, ExecutionStateSampler.ExecutionStateTracker executionStateTracker, ShortIdMap shortIdMap, BundleProgressReporter.Registrar registrar, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor) {
        this.metricsContainerRegistry = metricsContainerStepMap;
        this.metricsEnvironmentState = metricsEnvironmentState;
        this.stateTracker = executionStateTracker;
        this.shortIdMap = shortIdMap;
        this.bundleProgressReporterRegistrar = registrar;
        this.processBundleDescriptor = processBundleDescriptor;
        this.rehydratedComponents = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(processBundleDescriptor.getCodersMap()).putAllPcollections(processBundleDescriptor.getPcollectionsMap()).putAllWindowingStrategies(processBundleDescriptor.getWindowingStrategiesMap()).build());
    }

    public <T> void register(String str, String str2, String str3, FnDataReceiver<WindowedValue<T>> fnDataReceiver) {
        if (this.pCollectionIdsToWrappedConsumer.containsKey(str)) {
            throw new RuntimeException("New consumers for a pCollectionId cannot be register()-d after calling getMultiplexingConsumer.");
        }
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
        simpleMonitoringInfoBuilder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, str2);
        MetricsApi.MonitoringInfo build = simpleMonitoringInfoBuilder.build();
        if (build == null) {
            throw new IllegalStateException(String.format("Unable to construct %s counter for PTransform {id=%s, name=%s}", MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS, str2, str3));
        }
        this.pCollectionIdsToConsumers.computeIfAbsent(str, str4 -> {
            return new ArrayList();
        }).add(ConsumerAndMetadata.forConsumer(fnDataReceiver, str2, this.stateTracker.create(this.shortIdMap.getOrCreateShortId(build), str2, str3, ExecutionStateTracker.PROCESS_STATE_NAME), this.metricsContainerRegistry.getContainer(str2)));
    }

    public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String str) {
        return this.pCollectionIdsToWrappedConsumer.computeIfAbsent(str, str2 -> {
            if (!this.processBundleDescriptor.containsPcollections(str)) {
                throw new IllegalArgumentException(String.format("Unknown PCollection id %s", str));
            }
            String coderId = this.processBundleDescriptor.getPcollectionsOrThrow(str).getCoderId();
            try {
                Coder<?> coder = this.rehydratedComponents.getCoder(coderId);
                Coder<?> valueCoder = coder instanceof WindowedValue.WindowedValueCoder ? ((WindowedValue.WindowedValueCoder) coder).getValueCoder() : coder;
                List<ConsumerAndMetadata> computeIfAbsent = this.pCollectionIdsToConsumers.computeIfAbsent(str, str2 -> {
                    return new ArrayList();
                });
                if (computeIfAbsent.size() != 1) {
                    return new MultiplexingMetricTrackingFnDataReceiver(str2, valueCoder, ImmutableList.copyOf((Collection) computeIfAbsent), this.metricsEnvironmentState);
                }
                ConsumerAndMetadata consumerAndMetadata = computeIfAbsent.get(0);
                return consumerAndMetadata.getConsumer() instanceof HandlesSplits ? new SplittingMetricTrackingFnDataReceiver(str2, valueCoder, consumerAndMetadata, this.metricsEnvironmentState) : new MetricTrackingFnDataReceiver(str2, valueCoder, consumerAndMetadata, this.metricsEnvironmentState);
            } catch (IOException e) {
                throw new IllegalStateException(String.format("Unable to materialize coder %s", coderId), e);
            }
        });
    }
}
