package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.class */
public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
    static final String BEAM_SOURCE_OPTION = "beam-source";
    static final String DEFAULT_PARALLELISM = "default-parallelism";
    static final String PIPELINE_OPTIONS = "pipeline-options";

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetPartitionReader.class */
    private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
        private boolean started = false;
        private boolean closed = false;
        private final BoundedSource<T> source;
        private BoundedSource.BoundedReader<T> reader;

        DatasetPartitionReader(BoundedSource<T> boundedSource, SerializablePipelineOptions serializablePipelineOptions) {
            this.source = boundedSource;
            try {
                this.reader = boundedSource.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
            } catch (IOException e) {
                throw new RuntimeException("Error creating BoundedReader ", e);
            }
        }

        public boolean next() throws IOException {
            if (this.started) {
                return !this.closed && this.reader.advance();
            }
            this.started = true;
            return this.reader.start();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public InternalRow m68get() {
            return RowHelpers.storeWindowedValueInRow(WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp()), this.source.getOutputCoder());
        }

        public void close() throws IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetReader.class */
    private static class DatasetReader<T> implements DataSourceReader, Serializable {
        private int numPartitions;
        private BoundedSource<T> source;
        private SerializablePipelineOptions serializablePipelineOptions;

        private DatasetReader(DataSourceOptions dataSourceOptions) {
            if (!dataSourceOptions.get(DatasetSourceBatch.BEAM_SOURCE_OPTION).isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (BoundedSource) Base64Serializer.deserializeUnchecked((String) dataSourceOptions.get(DatasetSourceBatch.BEAM_SOURCE_OPTION).get(), BoundedSource.class);
            if (!dataSourceOptions.get(DatasetSourceBatch.DEFAULT_PARALLELISM).isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String) dataSourceOptions.get(DatasetSourceBatch.DEFAULT_PARALLELISM).get());
            Preconditions.checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
            if (!dataSourceOptions.get(DatasetSourceBatch.PIPELINE_OPTIONS).isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String) dataSourceOptions.get(DatasetSourceBatch.PIPELINE_OPTIONS).get());
        }

        public StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            PipelineOptions pipelineOptions = this.serializablePipelineOptions.get();
            ArrayList arrayList = new ArrayList();
            try {
                for (BoundedSource boundedSource : this.source.split(this.source.getEstimatedSizeBytes(pipelineOptions) / this.numPartitions, pipelineOptions)) {
                    arrayList.add(() -> {
                        return new DatasetPartitionReader(boundedSource, this.serializablePipelineOptions);
                    });
                }
                return arrayList;
            } catch (Exception e) {
                throw new RuntimeException("Error in splitting BoundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2105538737:
                    if (implMethodName.equals("lambda$planInputPartitions$5a9d570b$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/sources/v2/reader/InputPartition") && serializedLambda.getFunctionalInterfaceMethodName().equals("createPartitionReader") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/spark/sql/sources/v2/reader/InputPartitionReader;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetReader") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/BoundedSource;)Lorg/apache/spark/sql/sources/v2/reader/InputPartitionReader;")) {
                        DatasetReader datasetReader = (DatasetReader) serializedLambda.getCapturedArg(0);
                        BoundedSource boundedSource = (BoundedSource) serializedLambda.getCapturedArg(1);
                        return () -> {
                            return new DatasetPartitionReader(boundedSource, this.serializablePipelineOptions);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public DataSourceReader createReader(DataSourceOptions dataSourceOptions) {
        return new DatasetReader(dataSourceOptions);
    }
}
