package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;

import java.io.IOException;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.class */
class ReadSourceTranslatorStreaming<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
    private static final String sourceProviderClass = DatasetSourceStreaming.class.getCanonicalName();

    @Override // org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator
    public void translateTransform(PTransform<PBegin, PCollection<T>> pTransform, TranslationContext translationContext) {
        try {
            UnboundedSource unboundedSourceFromTransform = ReadTranslation.unboundedSourceFromTransform(translationContext.getCurrentTransform());
            SparkSession sparkSession = translationContext.getSparkSession();
            Dataset load = sparkSession.readStream().format(sourceProviderClass).option("beam-source", Base64Serializer.serializeUnchecked(unboundedSourceFromTransform)).option("default-parallelism", String.valueOf(translationContext.getSparkSession().sparkContext().defaultParallelism())).option("pipeline-options", translationContext.getSerializableOptions().toString()).load();
            WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(unboundedSourceFromTransform.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
            translationContext.putDataset(translationContext.getOutput(), load.map(RowHelpers.extractWindowedValueFromRowMapFunction(of), EncoderHelpers.fromBeamCoder(of)));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
