package org.apache.beam.runners.spark.io;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Queue;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/spark/io/CreateStream.class */
public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
    public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
    private final Duration batchDuration;
    private final Coder<T> coder;
    private Instant initialSystemTime;
    private final boolean forceWatermarkSync;
    private final Queue<Iterable<TimestampedValue<T>>> batches = new ArrayDeque();
    private final Deque<GlobalWatermarkHolder.SparkWatermarks> times = new ArrayDeque();
    private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    private CreateStream(Duration duration, Instant instant, Coder<T> coder, boolean z) {
        this.batchDuration = duration;
        this.initialSystemTime = instant;
        this.coder = coder;
        this.forceWatermarkSync = z;
    }

    public static <T> CreateStream<T> of(Coder<T> coder, Duration duration, boolean z) {
        return new CreateStream<>(duration, new Instant(0L), coder, z);
    }

    public static <T> CreateStream<T> of(Coder<T> coder, Duration duration) {
        return of(coder, duration, true);
    }

    @SafeVarargs
    public final CreateStream<T> nextBatch(TimestampedValue<T>... timestampedValueArr) {
        for (TimestampedValue<T> timestampedValue : timestampedValueArr) {
            Preconditions.checkArgument(timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timestampedValue.getTimestamp());
        }
        this.batches.offer(Arrays.asList(timestampedValueArr));
        return this;
    }

    @SafeVarargs
    public final CreateStream<T> nextBatch(T... tArr) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(tArr.length);
        for (T t : tArr) {
            newArrayListWithCapacity.add(TimestampedValue.atMinimumTimestamp(t));
        }
        this.batches.offer(newArrayListWithCapacity);
        return this;
    }

    public CreateStream<T> emptyBatch() {
        this.batches.offer(Collections.emptyList());
        return this;
    }

    public CreateStream<T> initialSystemTimeAt(Instant instant) {
        this.initialSystemTime = instant;
        return this;
    }

    public CreateStream<T> advanceWatermarkForNextBatch(Instant instant) {
        Preconditions.checkArgument(!instant.isBefore(this.lowWatermark), "The watermark is not allowed to decrease!");
        Preconditions.checkArgument(instant.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", instant, BoundedWindow.TIMESTAMP_MAX_VALUE);
        return advance(instant);
    }

    public CreateStream<T> advanceNextBatchWatermarkToInfinity() {
        return advance(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    private CreateStream<T> advance(Instant instant) {
        Instant synchronizedProcessingTime = this.times.peekLast() == null ? this.initialSystemTime : this.times.peekLast().getSynchronizedProcessingTime();
        Instant plus = synchronizedProcessingTime.plus(this.batchDuration);
        Preconditions.checkArgument(plus.isAfter(synchronizedProcessingTime), "Synchronized processing time must always advance.");
        this.times.offer(new GlobalWatermarkHolder.SparkWatermarks(this.lowWatermark, instant, plus));
        this.lowWatermark = instant;
        return this;
    }

    public long getBatchDuration() {
        return this.batchDuration.getMillis();
    }

    public Queue<Iterable<TimestampedValue<T>>> getBatches() {
        return this.batches;
    }

    public Queue<GlobalWatermarkHolder.SparkWatermarks> getTimes() {
        return this.times;
    }

    public boolean isForceWatermarkSync() {
        return this.forceWatermarkSync;
    }

    public PCollection<T> expand(PBegin pBegin) {
        return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED, this.coder);
    }
}
