package org.apache.beam.runners.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.FluentIterable;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/core/BeamFnDataReadRunner.class */
public class BeamFnDataReadRunner<OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
    private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private final BeamFnDataClient beamFnDataClientFactory;
    private final Coder<WindowedValue<OutputT>> coder;
    private final BeamFnApi.Target inputTarget;
    private CompletableFuture<Void> readFuture;

    public BeamFnDataReadRunner(BeamFnApi.FunctionSpec functionSpec, Supplier<String> supplier, BeamFnApi.Target target, BeamFnApi.Coder coder, BeamFnDataClient beamFnDataClient, Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> map) throws IOException {
        this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class).getApiServiceDescriptor();
        this.inputTarget = target;
        this.processBundleInstructionIdSupplier = supplier;
        this.beamFnDataClientFactory = beamFnDataClient;
        this.consumers = ImmutableList.copyOf(FluentIterable.concat(map.values()));
        this.coder = CloudObjects.coderFromCloudObject(CloudObject.fromSpec((Map) OBJECT_MAPPER.readValue(coder.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(), Map.class)));
    }

    public void registerInputLocation() {
        this.readFuture = this.beamFnDataClientFactory.forInboundConsumer(this.apiServiceDescriptor, KV.of(this.processBundleInstructionIdSupplier.get(), this.inputTarget), this.coder, this::multiplexToConsumers);
    }

    public void blockTillReadFinishes() throws Exception {
        LOG.debug("Waiting for process bundle instruction {} and target {} to close.", this.processBundleInstructionIdSupplier.get(), this.inputTarget);
        this.readFuture.get();
    }

    private void multiplexToConsumers(WindowedValue<OutputT> windowedValue) throws Exception {
        Iterator<ThrowingConsumer<WindowedValue<OutputT>>> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().accept(windowedValue);
        }
    }
}
