package org.apache.beam.runners.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.Serializer;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;

/* loaded from: input_file:org/apache/beam/runners/core/BeamFnDataWriteRunner.class */
public class BeamFnDataWriteRunner<InputT> {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
    private final BeamFnApi.Target outputTarget;
    private final Coder<WindowedValue<InputT>> coder;
    private final BeamFnDataClient beamFnDataClientFactory;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;

    public BeamFnDataWriteRunner(BeamFnApi.FunctionSpec functionSpec, Supplier<String> supplier, BeamFnApi.Target target, BeamFnApi.Coder coder, BeamFnDataClient beamFnDataClient) throws IOException {
        this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class).getApiServiceDescriptor();
        this.beamFnDataClientFactory = beamFnDataClient;
        this.processBundleInstructionIdSupplier = supplier;
        this.outputTarget = target;
        this.coder = (Coder) Serializer.deserialize((Map) OBJECT_MAPPER.readValue(coder.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(), Map.class), Coder.class);
    }

    public void registerForOutput() {
        this.consumer = this.beamFnDataClientFactory.forOutboundConsumer(this.apiServiceDescriptor, KV.of(this.processBundleInstructionIdSupplier.get(), this.outputTarget), this.coder);
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void consume(WindowedValue<InputT> windowedValue) throws Exception {
        this.consumer.accept(windowedValue);
    }
}
