package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Dataset;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.class */
class GroupByKeyTranslatorBatch<K, V> implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch$InMemoryStateInternalsFactory.class */
    static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
        InMemoryStateInternalsFactory() {
        }

        public StateInternals stateInternalsForKey(K k) {
            return InMemoryStateInternals.forKey(k);
        }
    }

    @Override // org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator
    public void translateTransform(PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> pTransform, TranslationContext translationContext) {
        PCollection input = translationContext.getInput();
        Dataset dataset = translationContext.getDataset(input);
        WindowingStrategy windowingStrategy = input.getWindowingStrategy();
        KvCoder coder = input.getCoder();
        Coder valueCoder = coder.getValueCoder();
        Coder keyCoder = coder.getKeyCoder();
        translationContext.putDataset(translationContext.getOutput(), dataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)).flatMapGroups(new GroupAlsoByWindowViaOutputBufferFn(windowingStrategy, new InMemoryStateInternalsFactory(), SystemReduceFn.buffering(valueCoder), translationContext.getSerializableOptions()), EncoderHelpers.fromBeamCoder(WindowedValue.FullWindowedValueCoder.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), windowingStrategy.getWindowFn().windowCoder()))));
    }
}
