/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.translate;

import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_euphoria.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_sdks_java_extensions_euphoria.com.google.common.collect.HashBasedTable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_euphoria.com.google.common.collect.Table;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.translate.AbstractJoinTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.LazyAccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.AdaptableCollector;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;

public class BroadcastHashJoinTranslator<LeftT, RightT, KeyT, OutputT>
extends AbstractJoinTranslator<LeftT, RightT, KeyT, OutputT> {
    @VisibleForTesting
    final Table<PCollection<?>, UnaryFunction<?, KeyT>, PCollectionView<?>> pViews = HashBasedTable.create();

    @Override
    PCollection<KV<KeyT, OutputT>> translate(Join<LeftT, RightT, KeyT, OutputT> operator, PCollection<LeftT> left, PCollection<KV<KeyT, LeftT>> leftKeyed, PCollection<RightT> right, PCollection<KV<KeyT, RightT>> rightKeyed) {
        LazyAccumulatorProvider accumulators = new LazyAccumulatorProvider(AccumulatorProvider.of(left.getPipeline()));
        switch (operator.getType()) {
            case LEFT: {
                PCollectionView<Map<KeyT, Iterable<RightT>>> broadcastRight = this.computeViewAsMultimapIfAbsent(right, operator.getRightKeyExtractor(), rightKeyed);
                return (PCollection)leftKeyed.apply((PTransform)ParDo.of(new BroadcastHashLeftJoinFn<KeyT, LeftT, RightT, OutputT>(broadcastRight, operator.getJoiner(), accumulators, operator.getName().orElse(null))).withSideInputs(new PCollectionView[]{broadcastRight}));
            }
            case RIGHT: {
                PCollectionView<Map<KeyT, Iterable<LeftT>>> broadcastLeft = this.computeViewAsMultimapIfAbsent(left, operator.getLeftKeyExtractor(), leftKeyed);
                return (PCollection)rightKeyed.apply((PTransform)ParDo.of(new BroadcastHashRightJoinFn<KeyT, LeftT, RightT, OutputT>(broadcastLeft, operator.getJoiner(), accumulators, operator.getName().orElse(null))).withSideInputs(new PCollectionView[]{broadcastLeft}));
            }
        }
        throw new UnsupportedOperationException(String.format("Cannot translate Euphoria '%s' operator to Beam transformations. Given join type '%s' is not supported for BroadcastHashJoin.", new Object[]{Join.class.getSimpleName(), operator.getType()}));
    }

    private <V> PCollectionView<Map<KeyT, Iterable<V>>> computeViewAsMultimapIfAbsent(PCollection<V> pcollection, UnaryFunction<?, KeyT> keyExtractor, PCollection<KV<KeyT, V>> pCollectionToView) {
        PCollectionView view = this.pViews.get(pcollection, keyExtractor);
        if (view == null) {
            view = (PCollectionView)pCollectionToView.apply((PTransform)View.asMultimap());
            this.pViews.put(pcollection, keyExtractor, view);
        }
        PCollectionView ret = view;
        return ret;
    }

    static class BroadcastHashLeftJoinFn<K, LeftT, RightT, OutputT>
    extends DoFn<KV<K, LeftT>, KV<K, OutputT>> {
        private final PCollectionView<Map<K, Iterable<RightT>>> smallSideCollection;
        private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
        private final AdaptableCollector<KV<K, LeftT>, KV<K, OutputT>, OutputT> outCollector;

        BroadcastHashLeftJoinFn(PCollectionView<Map<K, Iterable<RightT>>> smallSideCollection, BinaryFunctor<LeftT, RightT, OutputT> joiner, AccumulatorProvider accumulators, @Nullable String operatorName) {
            this.smallSideCollection = smallSideCollection;
            this.joiner = joiner;
            this.outCollector = new AdaptableCollector(accumulators, operatorName, (ctx, elem) -> ctx.output((Object)KV.of((Object)((KV)ctx.element()).getKey(), (Object)elem)));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            KV element = (KV)context.element();
            Object key = element.getKey();
            Map map = (Map)context.sideInput(this.smallSideCollection);
            Iterable rightValues = map.getOrDefault(key, Collections.singletonList(null));
            this.outCollector.setProcessContext(context);
            rightValues.forEach(rightValue -> this.joiner.apply(element.getValue(), rightValue, this.outCollector));
        }
    }

    static class BroadcastHashRightJoinFn<K, LeftT, RightT, OutputT>
    extends DoFn<KV<K, RightT>, KV<K, OutputT>> {
        private final PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection;
        private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
        private final AdaptableCollector<KV<K, RightT>, KV<K, OutputT>, OutputT> outCollector;

        BroadcastHashRightJoinFn(PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection, BinaryFunctor<LeftT, RightT, OutputT> joiner, AccumulatorProvider accumulators, @Nullable String operatorName) {
            this.smallSideCollection = smallSideCollection;
            this.joiner = joiner;
            this.outCollector = new AdaptableCollector(accumulators, operatorName, (ctx, elem) -> ctx.output((Object)KV.of((Object)((KV)ctx.element()).getKey(), (Object)elem)));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            KV element = (KV)context.element();
            Object key = element.getKey();
            Map map = (Map)context.sideInput(this.smallSideCollection);
            Iterable leftValues = map.getOrDefault(key, Collections.singletonList(null));
            this.outCollector.setProcessContext(context);
            leftValues.forEach(leftValue -> this.joiner.apply(leftValue, element.getValue(), this.outCollector));
        }
    }
}

