package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner.class */
public class BeamFnDataReadRunner<OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnDataReadRunner.class);
    private final String pTransformId;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final FnDataReceiver<WindowedValue<OutputT>> consumer;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private final Coder<WindowedValue<OutputT>> coder;
    private final String dataChannelReadIndexShortId;
    private final Object splittingLock = new Object();
    private long index;
    private long stopIndex;

    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner$Factory.class */
    static class Factory<OutputT> implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
        Factory() {
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(PTransformRunnerFactory.Context context) throws IOException {
            FnDataReceiver pCollectionConsumer = context.getPCollectionConsumer((String) Iterables.getOnlyElement(context.getPTransform().getOutputsMap().values()));
            ShortIdMap shortIdMap = context.getShortIdMap();
            Supplier<Cache<?, ?>> bundleCacheSupplier = context.getBundleCacheSupplier();
            String pTransformId = context.getPTransformId();
            RunnerApi.PTransform pTransform = context.getPTransform();
            Supplier<String> processBundleInstructionIdSupplier = context.getProcessBundleInstructionIdSupplier();
            Map<String, RunnerApi.Coder> coders = context.getCoders();
            BeamFnStateClient beamFnStateClient = context.getBeamFnStateClient();
            Objects.requireNonNull(context);
            BeamFnDataReadRunner<OutputT> beamFnDataReadRunner = new BeamFnDataReadRunner<>(shortIdMap, bundleCacheSupplier, pTransformId, pTransform, processBundleInstructionIdSupplier, coders, beamFnStateClient, context::addBundleProgressReporter, pCollectionConsumer);
            Endpoints.ApiServiceDescriptor apiServiceDescriptor = ((BeamFnDataReadRunner) beamFnDataReadRunner).apiServiceDescriptor;
            Coder coder = ((BeamFnDataReadRunner) beamFnDataReadRunner).coder;
            Objects.requireNonNull(beamFnDataReadRunner);
            context.addIncomingDataEndpoint(apiServiceDescriptor, coder, beamFnDataReadRunner::forwardElementToConsumer);
            Objects.requireNonNull(beamFnDataReadRunner);
            context.addFinishBundleFunction(beamFnDataReadRunner::blockTillReadFinishes);
            Objects.requireNonNull(beamFnDataReadRunner);
            context.addResetFunction(beamFnDataReadRunner::reset);
            return beamFnDataReadRunner;
        }
    }

    @AutoService({PTransformRunnerFactory.Registrar.class})
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner$Registrar.class */
    public static class Registrar implements PTransformRunnerFactory.Registrar {
        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(RemoteGrpcPortRead.URN, new Factory());
        }
    }

    BeamFnDataReadRunner(ShortIdMap shortIdMap, final Supplier<Cache<?, ?>> supplier, String str, RunnerApi.PTransform pTransform, final Supplier<String> supplier2, Map<String, RunnerApi.Coder> map, final BeamFnStateClient beamFnStateClient, Consumer<BundleProgressReporter> consumer, FnDataReceiver<WindowedValue<OutputT>> fnDataReceiver) throws IOException {
        this.pTransformId = str;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortRead.fromPTransform(pTransform).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.processBundleInstructionIdSupplier = supplier2;
        this.consumer = fnDataReceiver;
        this.coder = (Coder<WindowedValue<OutputT>>) CoderTranslation.fromProto(map.get(port.getCoderId()), RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(map).build()), new StateBackedIterable.StateBackedIterableTranslationContext() { // from class: org.apache.beam.fn.harness.BeamFnDataReadRunner.1
            @Override // org.apache.beam.fn.harness.state.StateBackedIterable.StateBackedIterableTranslationContext
            public Supplier<Cache<?, ?>> getCache() {
                return supplier;
            }

            @Override // org.apache.beam.fn.harness.state.StateBackedIterable.StateBackedIterableTranslationContext
            public BeamFnStateClient getStateClient() {
                return beamFnStateClient;
            }

            @Override // org.apache.beam.fn.harness.state.StateBackedIterable.StateBackedIterableTranslationContext
            public Supplier<String> getCurrentInstructionId() {
                return supplier2;
            }
        });
        this.dataChannelReadIndexShortId = shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX).setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE).setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, str).build());
        consumer.accept(new BundleProgressReporter() { // from class: org.apache.beam.fn.harness.BeamFnDataReadRunner.2
            @Override // org.apache.beam.fn.harness.control.BundleProgressReporter
            public void updateIntermediateMonitoringData(Map<String, ByteString> map2) {
                synchronized (BeamFnDataReadRunner.this.splittingLock) {
                    map2.put(BeamFnDataReadRunner.this.dataChannelReadIndexShortId, MonitoringInfoEncodings.encodeInt64Counter(BeamFnDataReadRunner.this.index));
                }
            }

            @Override // org.apache.beam.fn.harness.control.BundleProgressReporter
            public void updateFinalMonitoringData(Map<String, ByteString> map2) {
                map2.put(BeamFnDataReadRunner.this.dataChannelReadIndexShortId, MonitoringInfoEncodings.encodeInt64Counter(BeamFnDataReadRunner.this.index));
            }

            @Override // org.apache.beam.fn.harness.control.BundleProgressReporter
            public void reset() {
            }
        });
        clearSplitIndices();
    }

    public void forwardElementToConsumer(WindowedValue<OutputT> windowedValue) throws Exception {
        synchronized (this.splittingLock) {
            if (this.index == this.stopIndex - 1) {
                return;
            }
            this.index++;
            this.consumer.accept(windowedValue);
        }
    }

    public void trySplit(BeamFnApi.ProcessBundleSplitRequest processBundleSplitRequest, BeamFnApi.ProcessBundleSplitResponse.Builder builder) {
        BeamFnApi.ProcessBundleSplitRequest.DesiredSplit desiredSplit = processBundleSplitRequest.getDesiredSplitsMap().get(this.pTransformId);
        if (desiredSplit == null) {
            return;
        }
        long estimatedInputElements = desiredSplit.getEstimatedInputElements();
        ArrayList arrayList = new ArrayList(desiredSplit.getAllowedSplitPointsList());
        HandlesSplits handlesSplits = null;
        if (this.consumer instanceof HandlesSplits) {
            handlesSplits = (HandlesSplits) this.consumer;
        }
        synchronized (this.splittingLock) {
            if (this.index == this.stopIndex) {
                return;
            }
            if (processBundleSplitRequest.getInstructionId().equals(this.processBundleInstructionIdSupplier.get())) {
                if (estimatedInputElements < this.index + 1) {
                    estimatedInputElements = this.index + 1;
                } else if (estimatedInputElements > this.stopIndex) {
                    estimatedInputElements = this.stopIndex;
                }
                double d = 1.0d;
                if (this.index >= 0) {
                    d = handlesSplits != null ? handlesSplits.getProgress() : 0.5d;
                }
                double fractionOfRemainder = ((estimatedInputElements - this.index) - d) * desiredSplit.getFractionOfRemainder();
                if (d < 1.0d) {
                    double d2 = fractionOfRemainder / (1.0d - d);
                    if (d2 < 1.0d && isValidSplitPoint(arrayList, this.index) && isValidSplitPoint(arrayList, this.index + 1)) {
                        HandlesSplits.SplitResult trySplit = handlesSplits != null ? handlesSplits.trySplit(d2) : null;
                        if (trySplit != null) {
                            this.stopIndex = this.index + 1;
                            builder.addAllPrimaryRoots(trySplit.getPrimaryRoots()).addAllResidualRoots(trySplit.getResidualRoots()).addChannelSplitsBuilder().setLastPrimaryElement(this.index - 1).setFirstResidualElement(this.stopIndex);
                            return;
                        }
                    }
                }
                long max = this.index + Math.max(1L, Math.round(d + fractionOfRemainder));
                if (!isValidSplitPoint(arrayList, max)) {
                    Collections.sort(arrayList);
                    int i = -(Collections.binarySearch(arrayList, Long.valueOf(max)) + 1);
                    if (i == 0) {
                        max = arrayList.get(0).longValue();
                    } else if (i == arrayList.size()) {
                        max = arrayList.get(i - 1).longValue();
                    } else {
                        long longValue = arrayList.get(i - 1).longValue();
                        long longValue2 = arrayList.get(i).longValue();
                        max = (this.index >= longValue || max - longValue >= longValue2 - max) ? longValue2 : longValue;
                    }
                }
                if (max >= this.stopIndex || max <= this.index) {
                    return;
                }
                this.stopIndex = max;
                builder.addChannelSplitsBuilder().setLastPrimaryElement(this.stopIndex - 1).setFirstResidualElement(this.stopIndex);
            }
        }
    }

    public void blockTillReadFinishes() throws Exception {
        LOG.debug("Waiting for process bundle instruction {} and transform {} to close.", this.processBundleInstructionIdSupplier.get(), this.pTransformId);
        synchronized (this.splittingLock) {
            this.index++;
            this.stopIndex = this.index;
        }
    }

    public void reset() {
        Preconditions.checkArgument(this.processBundleInstructionIdSupplier.get() == null, "Cannot reset an active bundle processor.");
        clearSplitIndices();
    }

    private void clearSplitIndices() {
        synchronized (this.splittingLock) {
            this.index = -1L;
            this.stopIndex = Long.MAX_VALUE;
        }
    }

    private boolean isValidSplitPoint(List<Long> list, long j) {
        return list.isEmpty() || list.contains(Long.valueOf(j));
    }
}
