package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.Constants;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SchemaHelpers;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.parquet.Strings;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.class */
public class DatasetSourceBatch implements TableProvider {
    private static final StructType BINARY_SCHEMA = SchemaHelpers.binarySchema();

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetSourceBatchTable.class */
    private static class DatasetSourceBatchTable implements SupportsRead {

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetSourceBatchTable$BeamBatch.class */
        private static class BeamBatch<T> implements Batch, Serializable {
            private final int numPartitions;
            private final BoundedSource<T> source;
            private final SerializablePipelineOptions serializablePipelineOptions;

            /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetSourceBatchTable$BeamBatch$BeamInputPartition.class */
            private static class BeamInputPartition<T> implements InputPartition {
                private final BoundedSource<T> source;

                private BeamInputPartition(BoundedSource<T> boundedSource) {
                    this.source = boundedSource;
                }

                public BoundedSource<T> getSource() {
                    return this.source;
                }
            }

            /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch$DatasetSourceBatchTable$BeamBatch$BeamPartitionReader.class */
            private static class BeamPartitionReader<T> implements PartitionReader<InternalRow> {
                private final BoundedSource<T> source;
                private final BoundedSource.BoundedReader<T> reader;
                private boolean started = false;
                private boolean closed = false;

                BeamPartitionReader(BoundedSource<T> boundedSource, SerializablePipelineOptions serializablePipelineOptions) {
                    this.source = boundedSource;
                    try {
                        this.reader = boundedSource.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
                    } catch (IOException e) {
                        throw new RuntimeException("Error creating BoundedReader ", e);
                    }
                }

                public boolean next() throws IOException {
                    if (this.started) {
                        return !this.closed && this.reader.advance();
                    }
                    this.started = true;
                    return this.reader.start();
                }

                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public InternalRow m67get() {
                    return RowHelpers.storeWindowedValueInRow(WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp()), this.source.getOutputCoder());
                }

                public void close() throws IOException {
                    this.closed = true;
                    this.reader.close();
                }
            }

            private BeamBatch(CaseInsensitiveStringMap caseInsensitiveStringMap) {
                if (Strings.isNullOrEmpty(caseInsensitiveStringMap.get(Constants.BEAM_SOURCE_OPTION))) {
                    throw new RuntimeException("Beam source was not set in DataSource options");
                }
                this.source = (BoundedSource) Base64Serializer.deserializeUnchecked(caseInsensitiveStringMap.get(Constants.BEAM_SOURCE_OPTION), BoundedSource.class);
                if (Strings.isNullOrEmpty(Constants.DEFAULT_PARALLELISM)) {
                    throw new RuntimeException("Spark default parallelism was not set in DataSource options");
                }
                this.numPartitions = Integer.parseInt(caseInsensitiveStringMap.get(Constants.DEFAULT_PARALLELISM));
                Preconditions.checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
                if (Strings.isNullOrEmpty(caseInsensitiveStringMap.get(Constants.PIPELINE_OPTIONS))) {
                    throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
                }
                this.serializablePipelineOptions = new SerializablePipelineOptions(caseInsensitiveStringMap.get(Constants.PIPELINE_OPTIONS));
            }

            public InputPartition[] planInputPartitions() {
                PipelineOptions pipelineOptions = this.serializablePipelineOptions.get();
                try {
                    List split = this.source.split(this.source.getEstimatedSizeBytes(pipelineOptions) / this.numPartitions, pipelineOptions);
                    InputPartition[] inputPartitionArr = new InputPartition[split.size()];
                    int i = 0;
                    Iterator it = split.iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        inputPartitionArr[i2] = new BeamInputPartition((BoundedSource) it.next());
                    }
                    return inputPartitionArr;
                } catch (Exception e) {
                    throw new RuntimeException("Error in splitting BoundedSource " + this.source.getClass().getCanonicalName(), e);
                }
            }

            public PartitionReaderFactory createReaderFactory() {
                return new PartitionReaderFactory() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch.DatasetSourceBatchTable.BeamBatch.1
                    public PartitionReader<InternalRow> createReader(InputPartition inputPartition) {
                        return new BeamPartitionReader(((BeamInputPartition) inputPartition).getSource(), BeamBatch.this.serializablePipelineOptions);
                    }
                };
            }
        }

        private DatasetSourceBatchTable() {
        }

        public ScanBuilder newScanBuilder(final CaseInsensitiveStringMap caseInsensitiveStringMap) {
            return new ScanBuilder() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch.DatasetSourceBatchTable.1
                public Scan build() {
                    return new Scan() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch.DatasetSourceBatchTable.1.1
                        public StructType readSchema() {
                            return DatasetSourceBatch.BINARY_SCHEMA;
                        }

                        public Batch toBatch() {
                            return new BeamBatch(caseInsensitiveStringMap);
                        }
                    };
                }
            };
        }

        public String name() {
            return "BeamSource";
        }

        public StructType schema() {
            return DatasetSourceBatch.BINARY_SCHEMA;
        }

        public Set<TableCapability> capabilities() {
            return ImmutableSet.of(TableCapability.BATCH_READ);
        }
    }

    public StructType inferSchema(CaseInsensitiveStringMap caseInsensitiveStringMap) {
        return BINARY_SCHEMA;
    }

    public boolean supportsExternalMetadata() {
        return true;
    }

    public Table getTable(StructType structType, Transform[] transformArr, Map<String, String> map) {
        return new DatasetSourceBatchTable();
    }
}
