package com.google.cloud.dataflow.sdk.testing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DurationCoder;
import com.google.cloud.dataflow.sdk.coders.InstantCoder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.VarInt;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream.class */
public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
    private final List<Event<T>> events;
    private final Coder<T> coder;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$Builder.class */
    public static class Builder<T> {
        private final Coder<T> coder;
        private final ImmutableList<Event<T>> events;
        private final Instant currentWatermark;

        private Builder(Coder<T> coder) {
            this(coder, ImmutableList.of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        private Builder(Coder<T> coder, ImmutableList<Event<T>> immutableList, Instant instant) {
            this.coder = coder;
            this.events = immutableList;
            this.currentWatermark = instant;
        }

        @SafeVarargs
        public final Builder<T> addElements(T t, T... tArr) {
            TimestampedValue<T> of = TimestampedValue.of(t, this.currentWatermark);
            TimestampedValue<T>[] timestampedValueArr = new TimestampedValue[tArr.length];
            for (int i = 0; i < tArr.length; i++) {
                timestampedValueArr[i] = TimestampedValue.of(tArr[i], this.currentWatermark);
            }
            return addElements((TimestampedValue) of, (TimestampedValue[]) timestampedValueArr);
        }

        @SafeVarargs
        public final Builder<T> addElements(TimestampedValue<T> timestampedValue, TimestampedValue<T>... timestampedValueArr) {
            Preconditions.checkArgument(timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timestampedValue.getTimestamp());
            for (TimestampedValue<T> timestampedValue2 : timestampedValueArr) {
                Preconditions.checkArgument(timestampedValue2.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timestampedValue2.getTimestamp());
            }
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) ElementEvent.add(timestampedValue, timestampedValueArr)).build(), this.currentWatermark);
        }

        public Builder<T> advanceWatermarkTo(Instant instant) {
            Preconditions.checkArgument(instant.isAfter(this.currentWatermark), "The watermark must monotonically advance");
            Preconditions.checkArgument(instant.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", instant, BoundedWindow.TIMESTAMP_MAX_VALUE);
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) WatermarkEvent.advanceTo(instant)).build(), instant);
        }

        public Builder<T> advanceProcessingTime(Duration duration) {
            Preconditions.checkArgument(duration.getMillis() > 0, "Must advance the processing time by a positive amount. Got: ", duration);
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) ProcessingTimeEvent.advanceBy(duration)).build(), this.currentWatermark);
        }

        public TestStream<T> advanceWatermarkToInfinity() {
            return new TestStream<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) WatermarkEvent.advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE)).build());
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$ElementEvent.class */
    public static abstract class ElementEvent<T> implements Event<T> {
        public abstract Iterable<TimestampedValue<T>> getElements();

        @SafeVarargs
        static <T> Event<T> add(TimestampedValue<T> timestampedValue, TimestampedValue<T>... timestampedValueArr) {
            return add(ImmutableList.builder().add((ImmutableList.Builder) timestampedValue).add((Object[]) timestampedValueArr).build());
        }

        static <T> Event<T> add(Iterable<TimestampedValue<T>> iterable) {
            return new AutoValue_TestStream_ElementEvent(EventType.ELEMENT, iterable);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$Event.class */
    public interface Event<T> {
        EventType getType();
    }

    @VisibleForTesting
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$EventCoder.class */
    static final class EventCoder<T> extends StandardCoder<Event<T>> {
        private static final Coder<ReadableDuration> DURATION_CODER = DurationCoder.of();
        private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
        private final Coder<T> valueCoder;
        private final Coder<Iterable<TimestampedValue<T>>> elementCoder;

        public static <T> EventCoder<T> of(Coder<T> coder) {
            return new EventCoder<>(coder);
        }

        @JsonCreator
        public static <T> EventCoder<T> of(@JsonProperty("component_encodings") List<? extends Coder<?>> list) {
            Preconditions.checkArgument(list.size() == 1, "Was expecting exactly one component coder, got %s", Integer.valueOf(list.size()));
            return new EventCoder<>(list.get(0));
        }

        private EventCoder(Coder<T> coder) {
            this.valueCoder = coder;
            this.elementCoder = IterableCoder.of(TimestampedValue.TimestampedValueCoder.of(coder));
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public void encode(Event<T> event, OutputStream outputStream, Coder.Context context) throws IOException {
            VarInt.encode(event.getType().ordinal(), outputStream);
            switch (event.getType()) {
                case ELEMENT:
                    this.elementCoder.encode(((ElementEvent) event).getElements(), outputStream, context);
                    return;
                case WATERMARK:
                    INSTANT_CODER.encode(((WatermarkEvent) event).getWatermark(), outputStream, context);
                    return;
                case PROCESSING_TIME:
                    DURATION_CODER.encode(((ProcessingTimeEvent) event).getProcessingTimeAdvance(), outputStream, context);
                    return;
                default:
                    throw new AssertionError("Unreachable: Unsupported Event Type " + event.getType());
            }
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public Event<T> decode(InputStream inputStream, Coder.Context context) throws IOException {
            EventType eventType = EventType.values()[VarInt.decodeInt(inputStream)];
            switch (eventType) {
                case ELEMENT:
                    return ElementEvent.add(this.elementCoder.decode(inputStream, context));
                case WATERMARK:
                    return WatermarkEvent.advanceTo(INSTANT_CODER.decode(inputStream, context));
                case PROCESSING_TIME:
                    return ProcessingTimeEvent.advanceBy(DURATION_CODER.decode(inputStream, context).toDuration());
                default:
                    throw new AssertionError("Unreachable: Unsupported Event Type " + eventType);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.valueCoder);
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.elementCoder.verifyDeterministic();
            DURATION_CODER.verifyDeterministic();
            INSTANT_CODER.verifyDeterministic();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$EventType.class */
    public enum EventType {
        ELEMENT,
        WATERMARK,
        PROCESSING_TIME
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$ProcessingTimeEvent.class */
    public static abstract class ProcessingTimeEvent<T> implements Event<T> {
        public abstract Duration getProcessingTimeAdvance();

        static <T> Event<T> advanceBy(Duration duration) {
            return new AutoValue_TestStream_ProcessingTimeEvent(EventType.PROCESSING_TIME, duration);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestStream$WatermarkEvent.class */
    public static abstract class WatermarkEvent<T> implements Event<T> {
        public abstract Instant getWatermark();

        static <T> Event<T> advanceTo(Instant instant) {
            return new AutoValue_TestStream_WatermarkEvent(EventType.WATERMARK, instant);
        }
    }

    public static <T> Builder<T> create(Coder<T> coder) {
        return new Builder<>(coder);
    }

    private TestStream(Coder<T> coder, List<Event<T>> list) {
        this.coder = coder;
        this.events = (List) Preconditions.checkNotNull(list);
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
    public PCollection<T> apply(PBegin pBegin) {
        throw new IllegalStateException(String.format("Pipeline Runner %s does not support %s", pBegin.getPipeline().getRunner().getClass().getSimpleName(), getClass().getSimpleName()));
    }

    public Coder<T> getValueCoder() {
        return this.coder;
    }

    public Coder<Event<T>> getEventCoder() {
        return EventCoder.of(this.coder);
    }

    public List<Event<T>> getEvents() {
        return this.events;
    }
}
