package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.class */
public class FlinkStreamingSideInputHandlerFactory implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputToCollection;
    private final SideInputHandler runnerHandler;

    public static FlinkStreamingSideInputHandlerFactory forStage(ExecutableStage executableStage, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map, SideInputHandler sideInputHandler) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (SideInputReference sideInputReference : executableStage.getSideInputs()) {
            RunnerApi.ExecutableStagePayload.SideInputId build = RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInputReference.transform().getId()).setLocalName(sideInputReference.localName()).build();
            builder.put(build, (PCollectionView) Preconditions.checkNotNull(map.get(build), "No side input for %s/%s", build.getTransformId(), build.getLocalName()));
        }
        return new FlinkStreamingSideInputHandlerFactory(builder.build(), sideInputHandler);
    }

    private FlinkStreamingSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map, SideInputHandler sideInputHandler) {
        this.sideInputToCollection = map;
        this.runnerHandler = sideInputHandler;
    }

    public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String str, String str2, final Coder<V> coder, Coder<W> coder2) {
        final PCollectionView<?> pCollectionView = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(str).setLocalName(str2).build());
        Preconditions.checkArgument(pCollectionView != null, "No side input for %s/%s", str, str2);
        return (StateRequestHandlers.IterableSideInputHandler<V, W>) new StateRequestHandlers.IterableSideInputHandler<V, W>() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory.1
            /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TV;>; */
            public Iterable get(BoundedWindow boundedWindow) {
                return (Iterable) Preconditions.checkNotNull(FlinkStreamingSideInputHandlerFactory.this.runnerHandler.getIterable(pCollectionView, boundedWindow), "Element processed by SDK before side input is ready");
            }

            public Coder<V> elementCoder() {
                return coder;
            }
        };
    }

    public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String str, String str2, final KvCoder<K, V> kvCoder, Coder<W> coder) {
        final PCollectionView<?> pCollectionView = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(str).setLocalName(str2).build());
        Preconditions.checkArgument(pCollectionView != null, "No side input for %s/%s", str, str2);
        return (StateRequestHandlers.MultimapSideInputHandler<K, V, W>) new StateRequestHandlers.MultimapSideInputHandler<K, V, W>() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory.2
            /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
            public Iterable get(Object obj, BoundedWindow boundedWindow) {
                Iterable<KV> iterable = FlinkStreamingSideInputHandlerFactory.this.runnerHandler.getIterable(pCollectionView, boundedWindow);
                Object structuralValue = keyCoder().structuralValue(obj);
                ArrayList arrayList = new ArrayList();
                for (KV kv : iterable) {
                    if (structuralValue.equals(keyCoder().structuralValue(kv.getKey()))) {
                        arrayList.add(kv.getValue());
                    }
                }
                return Collections.unmodifiableList(arrayList);
            }

            /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TK;>; */
            /* JADX WARN: Multi-variable type inference failed */
            public Iterable get(BoundedWindow boundedWindow) {
                Iterable<KV> iterable = FlinkStreamingSideInputHandlerFactory.this.runnerHandler.getIterable(pCollectionView, boundedWindow);
                HashMap hashMap = new HashMap();
                for (KV kv : iterable) {
                    hashMap.putIfAbsent(keyCoder().structuralValue(kv.getKey()), kv.getKey());
                }
                return Collections.unmodifiableCollection(hashMap.values());
            }

            public Coder<K> keyCoder() {
                return kvCoder.getKeyCoder();
            }

            public Coder<V> valueCoder() {
                return kvCoder.getValueCoder();
            }
        };
    }
}
