package com.google.cloud.dataflow.sdk.runners.dataflow;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.coders.NullableCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.StringUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource.class */
public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
    private final BoundedSource<T> source;

    @VisibleForTesting
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter.class */
    public static class BoundedToUnboundedSourceAdapter<T> extends UnboundedSource<T, Checkpoint<T>> {
        private BoundedSource<T> boundedSource;

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Checkpoint.class */
        public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {

            @Nullable
            private final List<TimestampedValue<T>> residualElements;

            @Nullable
            private final BoundedSource<T> residualSource;

            public Checkpoint(@Nullable List<TimestampedValue<T>> list, @Nullable BoundedSource<T> boundedSource) {
                this.residualElements = list;
                this.residualSource = boundedSource;
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark
            public void finalizeCheckpoint() {
            }

            @Nullable
            @VisibleForTesting
            List<TimestampedValue<T>> getResidualElements() {
                return this.residualElements;
            }

            @Nullable
            @VisibleForTesting
            BoundedSource<T> getResidualSource() {
                return this.residualSource;
            }
        }

        @VisibleForTesting
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$CheckpointCoder.class */
        static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
            private final Coder<List<TimestampedValue<T>>> elemsCoder;
            private final Coder<T> elemCoder;
            private final Coder<BoundedSource> sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));

            @JsonCreator
            public static CheckpointCoder<?> of(@JsonProperty("component_encodings") List<Coder<?>> list) {
                Preconditions.checkArgument(list.size() == 1, "Expecting 1 components, got %s", Integer.valueOf(list.size()));
                return new CheckpointCoder<>(list.get(0));
            }

            CheckpointCoder(Coder<T> coder) {
                this.elemsCoder = NullableCoder.of(ListCoder.of(TimestampedValue.TimestampedValueCoder.of(coder)));
                this.elemCoder = coder;
            }

            @Override // com.google.cloud.dataflow.sdk.coders.Coder
            public void encode(Checkpoint<T> checkpoint, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
                Coder.Context nested = context.nested();
                this.elemsCoder.encode(((Checkpoint) checkpoint).residualElements, outputStream, nested);
                this.sourceCoder.encode(((Checkpoint) checkpoint).residualSource, outputStream, nested);
            }

            @Override // com.google.cloud.dataflow.sdk.coders.Coder
            public Checkpoint<T> decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
                Coder.Context nested = context.nested();
                return new Checkpoint<>(this.elemsCoder.decode(inputStream, nested), this.sourceCoder.decode(inputStream, nested));
            }

            @Override // com.google.cloud.dataflow.sdk.coders.Coder
            public List<Coder<?>> getCoderArguments() {
                return Arrays.asList(this.elemCoder);
            }

            @Override // com.google.cloud.dataflow.sdk.coders.Coder
            public void verifyDeterministic() throws Coder.NonDeterministicException {
                throw new Coder.NonDeterministicException(this, "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.class */
        public class Reader extends UnboundedSource.UnboundedReader<T> {
            private BoundedToUnboundedSourceAdapter<T>.ResidualElements residualElements;

            @Nullable
            private BoundedToUnboundedSourceAdapter<T>.ResidualSource residualSource;
            private final PipelineOptions options;
            private boolean done;

            Reader(@Nullable List<TimestampedValue<T>> list, @Nullable BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
                init(list, boundedSource, pipelineOptions);
                this.options = (PipelineOptions) Preconditions.checkNotNull(pipelineOptions, "options");
                this.done = false;
            }

            private void init(@Nullable List<TimestampedValue<T>> list, @Nullable BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
                this.residualElements = list == null ? new ResidualElements(Collections.emptyList()) : new ResidualElements(list);
                this.residualSource = boundedSource == null ? null : new ResidualSource(boundedSource, pipelineOptions);
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
            public boolean start() throws IOException {
                return advance();
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
            public boolean advance() throws IOException {
                if (this.residualElements.advance()) {
                    return true;
                }
                if (this.residualSource != null && this.residualSource.advance()) {
                    return true;
                }
                this.done = true;
                return false;
            }

            @Override // com.google.cloud.dataflow.sdk.io.Source.Reader, java.lang.AutoCloseable
            public void close() throws IOException {
                if (this.residualSource != null) {
                    this.residualSource.close();
                }
            }

            @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
            public T getCurrent() throws NoSuchElementException {
                if (this.residualElements.hasCurrent()) {
                    return this.residualElements.getCurrent();
                }
                if (this.residualSource != null) {
                    return this.residualSource.getCurrent();
                }
                throw new NoSuchElementException();
            }

            @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                if (this.residualElements.hasCurrent()) {
                    return this.residualElements.getCurrentTimestamp();
                }
                if (this.residualSource != null) {
                    return this.residualSource.getCurrentTimestamp();
                }
                throw new NoSuchElementException();
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader
            public Instant getWatermark() {
                return this.done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader
            public Checkpoint<T> getCheckpointMark() {
                Checkpoint<T> checkpointMark;
                if (this.residualElements.done()) {
                    checkpointMark = this.residualSource != null ? this.residualSource.getCheckpointMark() : new Checkpoint<>(null, null);
                } else {
                    checkpointMark = new Checkpoint<>(this.residualElements.getRestElements(), this.residualSource == null ? null : this.residualSource.getSource());
                }
                init(((Checkpoint) checkpointMark).residualElements, ((Checkpoint) checkpointMark).residualSource, this.options);
                return checkpointMark;
            }

            @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
            public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
                return BoundedToUnboundedSourceAdapter.this;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualElements.class */
        public class ResidualElements {
            private final List<TimestampedValue<T>> elementsList;

            @Nullable
            private Iterator<TimestampedValue<T>> elementsIterator = null;

            @Nullable
            private TimestampedValue<T> currentT = null;
            private boolean hasCurrent = false;
            private boolean done = false;

            ResidualElements(List<TimestampedValue<T>> list) {
                this.elementsList = (List) Preconditions.checkNotNull(list, "residualElementsList");
            }

            public boolean advance() {
                if (this.elementsIterator == null) {
                    this.elementsIterator = this.elementsList.iterator();
                }
                if (this.elementsIterator.hasNext()) {
                    this.currentT = this.elementsIterator.next();
                    this.hasCurrent = true;
                    return true;
                }
                this.done = true;
                this.hasCurrent = false;
                return false;
            }

            boolean hasCurrent() {
                return this.hasCurrent;
            }

            boolean done() {
                return this.done;
            }

            TimestampedValue<T> getCurrentTimestampedValue() {
                if (this.hasCurrent) {
                    return this.currentT;
                }
                throw new NoSuchElementException();
            }

            T getCurrent() {
                return getCurrentTimestampedValue().getValue();
            }

            Instant getCurrentTimestamp() {
                return getCurrentTimestampedValue().getTimestamp();
            }

            List<TimestampedValue<T>> getRestElements() {
                if (this.elementsIterator == null) {
                    return this.elementsList;
                }
                ArrayList newArrayList = Lists.newArrayList();
                while (this.elementsIterator.hasNext()) {
                    newArrayList.add(this.elementsIterator.next());
                }
                return newArrayList;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.class */
        public class ResidualSource {
            private BoundedSource<T> residualSource;
            private PipelineOptions options;

            @Nullable
            private BoundedSource.BoundedReader<T> reader = null;
            private boolean closed = false;

            public ResidualSource(BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
                this.residualSource = (BoundedSource) Preconditions.checkNotNull(boundedSource, "residualSource");
                this.options = (PipelineOptions) Preconditions.checkNotNull(pipelineOptions, "options");
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean advance() throws IOException {
                if (this.reader != null || this.closed) {
                    return this.reader.advance();
                }
                this.reader = this.residualSource.createReader(this.options);
                return this.reader.start();
            }

            T getCurrent() throws NoSuchElementException {
                if (this.reader == null) {
                    throw new NoSuchElementException();
                }
                return this.reader.getCurrent();
            }

            Instant getCurrentTimestamp() throws NoSuchElementException {
                if (this.reader == null) {
                    throw new NoSuchElementException();
                }
                return this.reader.getCurrentTimestamp();
            }

            void close() throws IOException {
                if (this.reader != null) {
                    this.reader.close();
                    this.reader = null;
                }
                this.closed = true;
            }

            BoundedSource<T> getSource() {
                return this.residualSource;
            }

            Checkpoint<T> getCheckpointMark() {
                if (this.reader == null) {
                    return new Checkpoint<>(null, this.residualSource);
                }
                BoundedSource<T> boundedSource = null;
                Double fractionConsumed = this.reader.getFractionConsumed();
                if (fractionConsumed != null && 0.0d <= fractionConsumed.doubleValue() && fractionConsumed.doubleValue() <= 1.0d) {
                    double doubleValue = 1.0d - fractionConsumed.doubleValue();
                    for (int i = 0; i < 8 && boundedSource == null; i++) {
                        boundedSource = this.reader.splitAtFraction2(fractionConsumed.doubleValue() + ((doubleValue * i) / 8));
                    }
                }
                ArrayList newArrayList = Lists.newArrayList();
                while (advance()) {
                    try {
                        newArrayList.add(TimestampedValue.of(this.reader.getCurrent(), this.reader.getCurrentTimestamp()));
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to read elements from the bounded reader.", e);
                    }
                }
                return new Checkpoint<>(newArrayList, boundedSource);
            }
        }

        public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
            this.boundedSource = boundedSource;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source
        public void validate() {
            this.boundedSource.validate();
        }

        @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource
        public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception {
            try {
                long estimatedSizeBytes = this.boundedSource.getEstimatedSizeBytes(pipelineOptions) / i;
                if (estimatedSizeBytes <= 0) {
                    DataflowUnboundedReadFromBoundedSource.LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", this.boundedSource);
                    return ImmutableList.of(this);
                }
                List<? extends BoundedSource<T>> splitIntoBundles = this.boundedSource.splitIntoBundles(estimatedSizeBytes, pipelineOptions);
                if (splitIntoBundles != null) {
                    return Lists.transform(splitIntoBundles, new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { // from class: com.google.cloud.dataflow.sdk.runners.dataflow.DataflowUnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.1
                        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                        public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> boundedSource) {
                            return new BoundedToUnboundedSourceAdapter<>(boundedSource);
                        }
                    });
                }
                DataflowUnboundedReadFromBoundedSource.LOG.warn("BoundedSource cannot split {}, skips the initial splits.", this.boundedSource);
                return ImmutableList.of(this);
            } catch (Exception e) {
                DataflowUnboundedReadFromBoundedSource.LOG.warn("Exception while splitting {}, skips the initial splits.", this.boundedSource, e);
                return ImmutableList.of(this);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource
        public BoundedToUnboundedSourceAdapter<T>.Reader createReader(PipelineOptions pipelineOptions, Checkpoint<T> checkpoint) {
            return checkpoint == null ? new Reader(null, this.boundedSource, pipelineOptions) : new Reader(((Checkpoint) checkpoint).residualElements, ((Checkpoint) checkpoint).residualSource, pipelineOptions);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source
        public Coder<T> getDefaultOutputCoder() {
            return this.boundedSource.getDefaultOutputCoder();
        }

        @Override // com.google.cloud.dataflow.sdk.io.UnboundedSource
        public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
            return new CheckpointCoder(this.boundedSource.getDefaultOutputCoder());
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source, com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("source", this.boundedSource.getClass()));
            builder.include(this.boundedSource);
        }
    }

    public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> boundedSource) {
        this.source = boundedSource;
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
    public PCollection<T> apply(PInput pInput) {
        return (PCollection) pInput.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter(this.source)));
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
    protected Coder<T> getDefaultOutputCoder() {
        return this.source.getDefaultOutputCoder();
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
    public String getKindString() {
        return "Read(" + (this.source.getClass().isAnonymousClass() ? "AnonymousSource" : StringUtils.approximateSimpleName(this.source.getClass())) + ")";
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform, com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item("source", this.source.getClass())).include(this.source);
    }
}
