package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;

@SuppressFBWarnings({"SE_BAD_FIELD"})
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.class */
class DatasetSourceStreaming implements DataSourceV2, MicroBatchReadSupport {
    static final String BEAM_SOURCE_OPTION = "beam-source";
    static final String DEFAULT_PARALLELISM = "default-parallelism";
    static final String PIPELINE_OPTIONS = "pipeline-options";
    private static final Offset EMPTY_OFFSET = new Offset() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetSourceStreaming.1
        public String json() {
            return "{offset : -1}";
        }
    };

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming$DatasetMicroBatchReader.class */
    private static class DatasetMicroBatchReader<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements MicroBatchReader, Serializable {
        private int numPartitions;
        private UnboundedSource<T, CheckpointMarkT> source;
        private SerializablePipelineOptions serializablePipelineOptions;
        private final List<DatasetPartitionReader> partitionReaders;

        private DatasetMicroBatchReader(String str, DataSourceOptions dataSourceOptions) {
            this.partitionReaders = new ArrayList();
            if (!dataSourceOptions.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (UnboundedSource) Base64Serializer.deserializeUnchecked((String) dataSourceOptions.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).get(), UnboundedSource.class);
            if (!dataSourceOptions.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String) dataSourceOptions.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).get());
            Preconditions.checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
            if (!dataSourceOptions.get(DatasetSourceStreaming.PIPELINE_OPTIONS).isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String) dataSourceOptions.get(DatasetSourceStreaming.PIPELINE_OPTIONS).get());
        }

        public void setOffsetRange(Optional<Offset> optional, Optional<Offset> optional2) {
        }

        public Offset getStartOffset() {
            return DatasetSourceStreaming.EMPTY_OFFSET;
        }

        public Offset getEndOffset() {
            return DatasetSourceStreaming.EMPTY_OFFSET;
        }

        public Offset deserializeOffset(String str) {
            return DatasetSourceStreaming.EMPTY_OFFSET;
        }

        public void commit(Offset offset) {
            for (DatasetPartitionReader datasetPartitionReader : this.partitionReaders) {
                try {
                    datasetPartitionReader.reader.getCheckpointMark().finalizeCheckpoint();
                } catch (IOException e) {
                    throw new RuntimeException(String.format("Commit of Offset %s failed, checkpointMark %s finalizeCheckpoint() failed", offset, datasetPartitionReader.reader.getCheckpointMark()));
                }
            }
        }

        public void stop() {
            try {
                for (DatasetPartitionReader datasetPartitionReader : this.partitionReaders) {
                    if (datasetPartitionReader.started) {
                        datasetPartitionReader.close();
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException("Error closing " + this + "partitionReaders", e);
            }
        }

        public StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            PipelineOptions pipelineOptions = this.serializablePipelineOptions.get();
            ArrayList arrayList = new ArrayList();
            try {
                for (final UnboundedSource unboundedSource : this.source.split(this.numPartitions, pipelineOptions)) {
                    arrayList.add(new InputPartition<InternalRow>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetSourceStreaming.DatasetMicroBatchReader.1
                        public InputPartitionReader<InternalRow> createPartitionReader() {
                            DatasetPartitionReader datasetPartitionReader = new DatasetPartitionReader(unboundedSource, DatasetMicroBatchReader.this.serializablePipelineOptions);
                            DatasetMicroBatchReader.this.partitionReaders.add(datasetPartitionReader);
                            return datasetPartitionReader;
                        }
                    });
                }
                return arrayList;
            } catch (Exception e) {
                throw new RuntimeException("Error in splitting UnboundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming$DatasetPartitionReader.class */
    private static class DatasetPartitionReader<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements InputPartitionReader<InternalRow> {
        private boolean started = false;
        private boolean closed = false;
        private final UnboundedSource<T, CheckpointMarkT> source;
        private UnboundedSource.UnboundedReader<T> reader;

        DatasetPartitionReader(UnboundedSource<T, CheckpointMarkT> unboundedSource, SerializablePipelineOptions serializablePipelineOptions) {
            this.source = unboundedSource;
            try {
                this.reader = unboundedSource.createReader(serializablePipelineOptions.get(), (UnboundedSource.CheckpointMark) null);
            } catch (IOException e) {
                throw new RuntimeException("Error creating UnboundedReader ", e);
            }
        }

        public boolean next() throws IOException {
            if (this.started) {
                return !this.closed && this.reader.advance();
            }
            this.started = true;
            return this.reader.start();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public InternalRow m74get() {
            return RowHelpers.storeWindowedValueInRow(WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp()), this.source.getOutputCoder());
        }

        public void close() throws IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    DatasetSourceStreaming() {
    }

    public MicroBatchReader createMicroBatchReader(Optional<StructType> optional, String str, DataSourceOptions dataSourceOptions) {
        return new DatasetMicroBatchReader(str, dataSourceOptions);
    }
}
