package org.apache.beam.runners.fnexecution.translation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.SideInputReference;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.class */
public class BatchSideInputHandlerFactory implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> sideInputToCollection;
    private final SideInputGetter sideInputGetter;

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory$SideInputGetter.class */
    public interface SideInputGetter {
        <T> List<T> getSideInput(String str);
    }

    public static BatchSideInputHandlerFactory forStage(ExecutableStage executableStage, SideInputGetter sideInputGetter) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (SideInputReference sideInputReference : executableStage.getSideInputs()) {
            builder.put(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInputReference.transform().getId()).setLocalName(sideInputReference.localName()).build(), sideInputReference.collection());
        }
        return new BatchSideInputHandlerFactory(builder.build(), sideInputGetter);
    }

    private BatchSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> map, SideInputGetter sideInputGetter) {
        this.sideInputToCollection = map;
        this.sideInputGetter = sideInputGetter;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory
    public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String str, String str2, final Coder<V> coder, final Coder<W> coder2) {
        PipelineNode.PCollectionNode pCollectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(str).setLocalName(str2).build());
        Preconditions.checkArgument(pCollectionNode != null, "No side input for %s/%s", str, str2);
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        for (WindowedValue windowedValue : this.sideInputGetter.getSideInput(pCollectionNode.getId())) {
            Iterator<? extends BoundedWindow> it = windowedValue.getWindows().iterator();
            while (it.hasNext()) {
                builder.put(coder2.structuralValue(it.next()), windowedValue.getValue());
            }
        }
        final ImmutableMultimap build = builder.build();
        return (StateRequestHandlers.IterableSideInputHandler<V, W>) new StateRequestHandlers.IterableSideInputHandler<V, W>() { // from class: org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory.1
            /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TV;>; */
            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.IterableSideInputHandler
            public Iterable get(BoundedWindow boundedWindow) {
                return build.get(coder2.structuralValue(boundedWindow));
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.IterableSideInputHandler
            public Coder<V> elementCoder() {
                return coder;
            }
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory
    public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String str, String str2, final KvCoder<K, V> kvCoder, final Coder<W> coder) {
        PipelineNode.PCollectionNode pCollectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(str).setLocalName(str2).build());
        Preconditions.checkArgument(pCollectionNode != null, "No side input for %s/%s", str, str2);
        final Coder<K> keyCoder = kvCoder.getKeyCoder();
        final HashMap hashMap = new HashMap();
        for (WindowedValue windowedValue : this.sideInputGetter.getSideInput(pCollectionNode.getId())) {
            Object key = ((KV) windowedValue.getValue()).getKey();
            Object value = ((KV) windowedValue.getValue()).getValue();
            Iterator<? extends BoundedWindow> it = windowedValue.getWindows().iterator();
            while (it.hasNext()) {
                ((List) ((KV) ((Map) hashMap.computeIfAbsent(coder.structuralValue(it.next()), obj -> {
                    return new HashMap();
                })).computeIfAbsent(keyCoder.structuralValue(key), obj2 -> {
                    return KV.of(key, new ArrayList());
                })).getValue()).add(value);
            }
        }
        return (StateRequestHandlers.MultimapSideInputHandler<K, V, W>) new StateRequestHandlers.MultimapSideInputHandler<K, V, W>() { // from class: org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory.2
            /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler
            public Iterable get(Object obj3, BoundedWindow boundedWindow) {
                KV kv = (KV) ((Map) hashMap.getOrDefault(coder.structuralValue(boundedWindow), Collections.emptyMap())).get(keyCoder.structuralValue(obj3));
                return kv == null ? Collections.emptyList() : Collections.unmodifiableList((List) kv.getValue());
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler
            public Coder<V> valueCoder() {
                return kvCoder.getValueCoder();
            }

            /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TK;>; */
            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler
            public Iterable get(BoundedWindow boundedWindow) {
                return Iterables.unmodifiableIterable(FluentIterable.concat(new Iterable[]{((Map) hashMap.getOrDefault(coder.structuralValue(boundedWindow), Collections.emptyMap())).values()}).transform(kv -> {
                    return kv.getKey();
                }));
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler
            public Coder<K> keyCoder() {
                return kvCoder.getKeyCoder();
            }
        };
    }
}
