package org.apache.beam.runners.fnexecution.translation;

import com.google.auto.value.AutoValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.class */
public class BatchSideInputHandlerFactory implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> sideInputToCollection;
    private final SideInputGetter sideInputGetter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory$MultimapSideInputHandler.class */
    public static class MultimapSideInputHandler<K, V, W extends BoundedWindow> implements StateRequestHandlers.SideInputHandler<V, W> {
        private final Multimap<SideInputKey, V> collection;
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final Coder<W> windowCoder;

        private MultimapSideInputHandler(Multimap<SideInputKey, V> multimap, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
            this.collection = multimap;
            this.keyCoder = coder;
            this.valueCoder = coder2;
            this.windowCoder = coder3;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler
        public Iterable<V> get(byte[] bArr, W w) {
            try {
                return this.collection.get(SideInputKey.of(this.keyCoder.structuralValue(this.keyCoder.decode(new ByteArrayInputStream(bArr))), this.windowCoder.structuralValue(w)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler
        public Coder<V> resultCoder() {
            return this.valueCoder;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory$SideInputGetter.class */
    public interface SideInputGetter {
        <T> List<T> getSideInput(String str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory$SideInputKey.class */
    public static abstract class SideInputKey {
        static SideInputKey of(Object obj, Object obj2) {
            return new AutoValue_BatchSideInputHandlerFactory_SideInputKey(obj, obj2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Object key();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Object window();
    }

    public static BatchSideInputHandlerFactory forStage(ExecutableStage executableStage, SideInputGetter sideInputGetter) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (SideInputReference sideInputReference : executableStage.getSideInputs()) {
            builder.put(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInputReference.transform().getId()).setLocalName(sideInputReference.localName()).build(), sideInputReference.collection());
        }
        return new BatchSideInputHandlerFactory(builder.build(), sideInputGetter);
    }

    private BatchSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> map, SideInputGetter sideInputGetter) {
        this.sideInputToCollection = map;
        this.sideInputGetter = sideInputGetter;
    }

    @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory
    public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String str, String str2, RunnerApi.FunctionSpec functionSpec, Coder<T> coder, Coder<W> coder2) {
        PipelineNode.PCollectionNode pCollectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(str).setLocalName(str2).build());
        Preconditions.checkArgument(pCollectionNode != null, "No side input for %s/%s", str, str2);
        if (PTransformTranslation.ITERABLE_SIDE_INPUT.equals(functionSpec.getUrn())) {
            return forIterableSideInput(this.sideInputGetter.getSideInput(pCollectionNode.getId()), coder, coder2);
        }
        if (!PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(functionSpec.getUrn())) {
            throw new IllegalArgumentException(String.format("Unknown side input access pattern: %s", functionSpec));
        }
        KvCoder kvCoder = (KvCoder) coder;
        return forMultimapSideInput(this.sideInputGetter.getSideInput(pCollectionNode.getId()), kvCoder.getKeyCoder(), kvCoder.getValueCoder(), coder2);
    }

    private <T, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<T, W> forIterableSideInput(List<WindowedValue<T>> list, final Coder<T> coder, final Coder<W> coder2) {
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        for (WindowedValue<T> windowedValue : list) {
            Iterator it = windowedValue.getWindows().iterator();
            while (it.hasNext()) {
                builder.put(coder2.structuralValue((BoundedWindow) it.next()), windowedValue.getValue());
            }
        }
        final ImmutableMultimap build = builder.build();
        return (StateRequestHandlers.SideInputHandler<T, W>) new StateRequestHandlers.SideInputHandler<T, W>() { // from class: org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory.1
            /* JADX WARN: Incorrect types in method signature: ([BTW;)Ljava/lang/Iterable<TT;>; */
            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler
            public Iterable get(byte[] bArr, BoundedWindow boundedWindow) {
                return build.get(coder2.structuralValue(boundedWindow));
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler
            public Coder<T> resultCoder() {
                return coder;
            }
        };
    }

    private <K, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forMultimapSideInput(List<WindowedValue<KV<K, V>>> list, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        for (WindowedValue<KV<K, V>> windowedValue : list) {
            Object key = ((KV) windowedValue.getValue()).getKey();
            Object value = ((KV) windowedValue.getValue()).getValue();
            Iterator it = windowedValue.getWindows().iterator();
            while (it.hasNext()) {
                builder.put(SideInputKey.of(coder.structuralValue(key), coder3.structuralValue((BoundedWindow) it.next())), value);
            }
        }
        return new MultimapSideInputHandler(builder.build(), coder, coder2, coder3);
    }
}
