package org.apache.beam.sdk.nexmark.sources;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.sources.generator.Generator;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.class */
public class UnboundedEventSource extends UnboundedSource<Event, GeneratorCheckpoint> {
    private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
    private final GeneratorConfig config;
    private final int numEventGenerators;
    private final long watermarkHoldbackSec;
    private final boolean isRateLimited;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/sources/UnboundedEventSource$EventReader.class */
    public class EventReader extends UnboundedSource.UnboundedReader<Event> {
        private final Generator generator;
        private long watermark;
        private long backlogDurationMs;

        @Nullable
        private Long backlogBytes;
        private long lastReportedBacklogWallclock;
        private long timestampAtLastReportedBacklogMs;

        @Nullable
        private TimestampedValue<Event> pendingEvent;
        private long pendingEventWallclockTime;

        @Nullable
        private TimestampedValue<Event> currentEvent;
        private final Queue<Generator.NextEvent> heldBackEvents;

        public EventReader(Generator generator) {
            this.heldBackEvents = new PriorityQueue();
            this.generator = generator;
            this.watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
            this.lastReportedBacklogWallclock = -1L;
            this.pendingEventWallclockTime = -1L;
            this.timestampAtLastReportedBacklogMs = -1L;
        }

        public EventReader(UnboundedEventSource unboundedEventSource, GeneratorConfig generatorConfig) {
            this(new Generator(generatorConfig));
        }

        public boolean start() {
            UnboundedEventSource.LOG.trace("starting unbounded generator {}", this.generator);
            return advance();
        }

        public boolean advance() {
            long currentTimeMillis = System.currentTimeMillis();
            while (this.pendingEvent == null) {
                if (!this.generator.hasNext() && this.heldBackEvents.isEmpty()) {
                    if (UnboundedEventSource.this.isRateLimited) {
                        updateBacklog(System.currentTimeMillis(), 0L);
                    }
                    if (this.watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                        return false;
                    }
                    this.watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
                    UnboundedEventSource.LOG.trace("stopped unbounded generator {}", this.generator);
                    return false;
                }
                Generator.NextEvent peek = this.heldBackEvents.peek();
                if (peek != null && peek.wallclockTimestamp <= currentTimeMillis) {
                    this.heldBackEvents.poll();
                    UnboundedEventSource.LOG.debug("replaying held-back event {}ms behind watermark", Long.valueOf(this.watermark - peek.eventTimestamp));
                } else {
                    if (!this.generator.hasNext()) {
                        if (!UnboundedEventSource.this.isRateLimited) {
                            return false;
                        }
                        updateBacklog(currentTimeMillis, 0L);
                        return false;
                    }
                    peek = this.generator.nextEvent();
                    if (UnboundedEventSource.this.isRateLimited && UnboundedEventSource.this.config.getProbDelayedEvent() > 0.0d && UnboundedEventSource.this.config.getOccasionalDelaySec() > 0 && ThreadLocalRandom.current().nextDouble() < UnboundedEventSource.this.config.getProbDelayedEvent()) {
                        long nextLong = ThreadLocalRandom.current().nextLong(UnboundedEventSource.this.config.getOccasionalDelaySec() * 1000) + 1;
                        UnboundedEventSource.LOG.debug("delaying event by {}ms", Long.valueOf(nextLong));
                        this.heldBackEvents.add(peek.withDelay(nextLong));
                    }
                }
                this.pendingEventWallclockTime = peek.wallclockTimestamp;
                this.pendingEvent = TimestampedValue.of(peek.event, new Instant(peek.eventTimestamp));
                long millis = peek.watermark - Duration.standardSeconds(UnboundedEventSource.this.watermarkHoldbackSec).getMillis();
                if (millis > this.watermark) {
                    this.watermark = millis;
                }
            }
            if (UnboundedEventSource.this.isRateLimited) {
                if (this.pendingEventWallclockTime > currentTimeMillis) {
                    updateBacklog(currentTimeMillis, 0L);
                    return false;
                }
                updateBacklog(currentTimeMillis, currentTimeMillis - this.pendingEventWallclockTime);
            }
            this.currentEvent = this.pendingEvent;
            this.pendingEvent = null;
            return true;
        }

        private void updateBacklog(long j, long j2) {
            this.backlogDurationMs = j2;
            long currentInterEventDelayUs = this.generator.currentInterEventDelayUs();
            if (currentInterEventDelayUs != 0) {
                this.backlogBytes = Long.valueOf(this.generator.getCurrentConfig().estimatedBytesForEvents((((this.backlogDurationMs * 1000) + currentInterEventDelayUs) - 1) / currentInterEventDelayUs));
            }
            if (this.lastReportedBacklogWallclock < 0 || j - this.lastReportedBacklogWallclock > UnboundedEventSource.BACKLOG_PERIOD.getMillis()) {
                double d = Double.NaN;
                if (this.pendingEvent != null && this.lastReportedBacklogWallclock >= 0 && this.timestampAtLastReportedBacklogMs >= 0) {
                    d = (this.pendingEvent.getTimestamp().getMillis() - this.timestampAtLastReportedBacklogMs) / (j - this.lastReportedBacklogWallclock);
                }
                UnboundedEventSource.LOG.debug("unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay with {} time dilation", new Object[]{Long.valueOf(this.backlogDurationMs), this.backlogBytes, Long.valueOf(currentInterEventDelayUs), Double.valueOf(d)});
                this.lastReportedBacklogWallclock = j;
                if (this.pendingEvent != null) {
                    this.timestampAtLastReportedBacklogMs = this.pendingEvent.getTimestamp().getMillis();
                }
            }
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public Event m68getCurrent() {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return (Event) this.currentEvent.getValue();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return this.currentEvent.getTimestamp();
        }

        public void close() {
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public UnboundedEventSource m67getCurrentSource() {
            return UnboundedEventSource.this;
        }

        public Instant getWatermark() {
            return new Instant(this.watermark);
        }

        /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
        public GeneratorCheckpoint m66getCheckpointMark() {
            return this.generator.toCheckpoint();
        }

        public long getSplitBacklogBytes() {
            if (this.backlogBytes == null) {
                return -1L;
            }
            return this.backlogBytes.longValue();
        }

        public String toString() {
            return String.format("EventReader(%d, %d, %d)", Long.valueOf(this.generator.getCurrentConfig().getStartEventId()), Long.valueOf(this.generator.getNextEventId()), Long.valueOf(this.generator.getCurrentConfig().getStopEventId()));
        }
    }

    public UnboundedEventSource(GeneratorConfig generatorConfig, int i, long j, boolean z) {
        this.config = generatorConfig;
        this.numEventGenerators = i;
        this.watermarkHoldbackSec = j;
        this.isRateLimited = z;
    }

    public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() {
        return GeneratorCheckpoint.CODER_INSTANCE;
    }

    public List<UnboundedEventSource> split(int i, PipelineOptions pipelineOptions) {
        LOG.trace("splitting unbounded source into {} sub-sources", Integer.valueOf(this.numEventGenerators));
        ArrayList arrayList = new ArrayList();
        Iterator<GeneratorConfig> it = this.config.split(this.numEventGenerators).iterator();
        while (it.hasNext()) {
            arrayList.add(new UnboundedEventSource(it.next(), 1, this.watermarkHoldbackSec, this.isRateLimited));
        }
        return arrayList;
    }

    public EventReader createReader(PipelineOptions pipelineOptions, @Nullable GeneratorCheckpoint generatorCheckpoint) {
        if (generatorCheckpoint == null) {
            LOG.trace("creating initial unbounded reader for {}", this.config);
            return new EventReader(this, this.config);
        }
        LOG.trace("resuming unbounded reader from {}", generatorCheckpoint);
        return new EventReader(generatorCheckpoint.toGenerator(this.config));
    }

    public void validate() {
    }

    public Coder<Event> getDefaultOutputCoder() {
        return Event.CODER;
    }

    public String toString() {
        return String.format("UnboundedEventSource(%d, %d)", Long.valueOf(this.config.getStartEventId()), Long.valueOf(this.config.getStopEventId()));
    }
}
