package org.apache.beam.fn.harness.control;

import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fake.FakeStepContext;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.BeamFnDataReadRunner;
import org.apache.beam.runners.core.BeamFnDataWriteRunner;
import org.apache.beam.runners.core.BoundedSourceRunner;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Ascii;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.Collections2;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.HashMultimap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler.class */
public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
    private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
    private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
    private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
    private final PipelineOptions options;
    private final Function<String, Message> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;

    public ProcessBundleHandler(PipelineOptions pipelineOptions, Function<String, Message> function, BeamFnDataClient beamFnDataClient) {
        this.options = pipelineOptions;
        this.fnApiRegistry = function;
        this.beamFnDataClient = beamFnDataClient;
    }

    protected <InputT, OutputT> void createConsumersForPrimitiveTransform(BeamFnApi.PrimitiveTransform primitiveTransform, Supplier<String> supplier, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> function, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> biConsumer, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2) throws IOException {
        ThrowingConsumer<WindowedValue<InputT>> throwingConsumer;
        BeamFnApi.FunctionSpec functionSpec = primitiveTransform.getFunctionSpec();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry entry : primitiveTransform.getOutputsMap().entrySet()) {
            builder.put(entry.getKey(), function.apply(BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(primitiveTransform.getId()).setName((String) entry.getKey()).build()));
        }
        ImmutableMap build = builder.build();
        String urn = functionSpec.getUrn();
        boolean z = -1;
        switch (urn.hashCode()) {
            case -1047285070:
                if (urn.equals(DATA_OUTPUT_URN)) {
                    z = true;
                    break;
                }
                break;
            case -634091044:
                if (urn.equals(JAVA_SOURCE_URN)) {
                    z = 4;
                    break;
                }
                break;
            case 19947236:
                if (urn.equals(JAVA_DO_FN_URN)) {
                    z = 3;
                    break;
                }
                break;
            case 344800170:
                if (urn.equals(DATA_INPUT_URN)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case Ascii.SOH /* 1 */:
                BeamFnDataWriteRunner beamFnDataWriteRunner = new BeamFnDataWriteRunner(functionSpec, supplier, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(primitiveTransform.getId()).setName((String) Iterables.getOnlyElement(primitiveTransform.getOutputsMap().keySet())).build(), this.fnApiRegistry.apply(((BeamFnApi.PCollection) Iterables.getOnlyElement(primitiveTransform.getOutputsMap().values())).getCoderReference()), this.beamFnDataClient);
                beamFnDataWriteRunner.getClass();
                consumer.accept(beamFnDataWriteRunner::registerForOutput);
                beamFnDataWriteRunner.getClass();
                throwingConsumer = beamFnDataWriteRunner::consume;
                beamFnDataWriteRunner.getClass();
                consumer2.accept(beamFnDataWriteRunner::close);
                break;
            case true:
                BeamFnDataReadRunner beamFnDataReadRunner = new BeamFnDataReadRunner(functionSpec, supplier, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(primitiveTransform.getId()).setName((String) Iterables.getOnlyElement(primitiveTransform.getInputsMap().keySet())).build(), this.fnApiRegistry.apply(((BeamFnApi.PCollection) Iterables.getOnlyElement(primitiveTransform.getOutputsMap().values())).getCoderReference()), this.beamFnDataClient, build);
                beamFnDataReadRunner.getClass();
                consumer.accept(beamFnDataReadRunner::registerInputLocation);
                throwingConsumer = null;
                beamFnDataReadRunner.getClass();
                consumer2.accept(beamFnDataReadRunner::blockTillReadFinishes);
                break;
            case Ascii.ETX /* 3 */:
                DoFnRunner<InputT, OutputT> createDoFnRunner = createDoFnRunner(functionSpec, build);
                createDoFnRunner.getClass();
                consumer.accept(createDoFnRunner::startBundle);
                createDoFnRunner.getClass();
                consumer2.accept(createDoFnRunner::finishBundle);
                createDoFnRunner.getClass();
                throwingConsumer = createDoFnRunner::processElement;
                break;
            case true:
                BoundedSourceRunner createBoundedSourceRunner = createBoundedSourceRunner(functionSpec, build);
                createBoundedSourceRunner.getClass();
                ThrowingConsumer<WindowedValue<InputT>> throwingConsumer2 = createBoundedSourceRunner::runReadLoop;
                createBoundedSourceRunner.getClass();
                consumer.accept(createBoundedSourceRunner::start);
                throwingConsumer = throwingConsumer2;
                break;
            default:
                throw new IllegalArgumentException(String.format("Unknown FunctionSpec %s", functionSpec));
        }
        if (throwingConsumer != null) {
            Iterator it = primitiveTransform.getInputsMap().entrySet().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((BeamFnApi.Target.List) ((Map.Entry) it.next()).getValue()).getTargetList().iterator();
                while (it2.hasNext()) {
                    biConsumer.accept((BeamFnApi.Target) it2.next(), throwingConsumer);
                }
            }
        }
    }

    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        BeamFnApi.InstructionResponse.Builder processBundle = BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
        BeamFnApi.ProcessBundleDescriptor apply = this.fnApiRegistry.apply(instructionRequest.getProcessBundle().getProcessBundleDescriptorReference());
        HashMultimap create = HashMultimap.create();
        ArrayList<ThrowingRunnable> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (BeamFnApi.PrimitiveTransform primitiveTransform : Lists.reverse(apply.getPrimitiveTransformList())) {
            instructionRequest.getClass();
            Supplier<String> supplier = instructionRequest::getInstructionId;
            create.getClass();
            Function function = (v1) -> {
                return r3.get(v1);
            };
            create.getClass();
            BiConsumer biConsumer = (v1, v2) -> {
                r4.put(v1, v2);
            };
            arrayList.getClass();
            Consumer<ThrowingRunnable> consumer = (v1) -> {
                r5.add(v1);
            };
            arrayList2.getClass();
            createConsumersForPrimitiveTransform(primitiveTransform, supplier, function, biConsumer, consumer, (v1) -> {
                r6.add(v1);
            });
        }
        for (ThrowingRunnable throwingRunnable : arrayList) {
            LOG.debug("Starting function {}", throwingRunnable);
            throwingRunnable.run();
        }
        for (ThrowingRunnable throwingRunnable2 : Lists.reverse(arrayList2)) {
            LOG.debug("Finishing function {}", throwingRunnable2);
            throwingRunnable2.run();
        }
        return processBundle;
    }

    private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(BeamFnApi.FunctionSpec functionSpec, Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> map) {
        try {
            DoFnInfo doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(functionSpec.getData().unpack(BytesValue.class).getValue().toByteArray(), "DoFnInfo");
            Preconditions.checkArgument(Objects.equals(new HashSet(Collections2.transform(map.keySet(), Long::parseLong)), doFnInfo.getOutputMap().keySet()), "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.", map.keySet(), doFnInfo.getOutputMap());
            ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
            for (Map.Entry entry : doFnInfo.getOutputMap().entrySet()) {
                builder.putAll((ImmutableMultimap.Builder) entry.getValue(), (Iterable) map.get(Long.toString(((Long) entry.getKey()).longValue())));
            }
            final ImmutableMap asMap = builder.build().asMap();
            return DoFnRunners.simpleRunner(PipelineOptionsFactory.create(), doFnInfo.getDoFn(), NullSideInputReader.empty(), new DoFnRunners.OutputManager() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandler.1
                Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput;

                {
                    this.tupleTagToOutput = asMap;
                }

                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    try {
                        Collection<ThrowingConsumer<WindowedValue<?>>> collection = this.tupleTagToOutput.get(tupleTag);
                        if (collection == null) {
                            return;
                        }
                        Iterator<ThrowingConsumer<WindowedValue<?>>> it = collection.iterator();
                        while (it.hasNext()) {
                            it.next().accept(windowedValue);
                        }
                    } catch (Throwable th) {
                        throw new RuntimeException(th);
                    }
                }
            }, (TupleTag) doFnInfo.getOutputMap().get(Long.valueOf(doFnInfo.getMainOutput())), new ArrayList(doFnInfo.getOutputMap().values()), new FakeStepContext(), doFnInfo.getWindowingStrategy());
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(String.format("Unable to unwrap DoFn %s", functionSpec), e);
        }
    }

    private <InputT extends BoundedSource<OutputT>, OutputT> BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(BeamFnApi.FunctionSpec functionSpec, Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> map) {
        return new BoundedSourceRunner<>(this.options, functionSpec, map);
    }
}
