package org.apache.druid.testing.utils;

import java.util.List;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/testing/utils/SyntheticStreamGenerator.class */
public abstract class SyntheticStreamGenerator implements StreamGenerator {
    private static final Logger LOG = new Logger(SyntheticStreamGenerator.class);
    private final EventSerializer serializer;
    private final int eventsPerSecond;
    private final long cyclePaddingMs;

    public SyntheticStreamGenerator(EventSerializer eventSerializer, int i, long j) {
        this.serializer = eventSerializer;
        this.eventsPerSecond = i;
        this.cyclePaddingMs = j;
    }

    abstract List<Pair<String, Object>> newEvent(int i, DateTime dateTime);

    @Override // org.apache.druid.testing.utils.StreamGenerator
    public long run(String str, StreamEventWriter streamEventWriter, int i) {
        return run(str, streamEventWriter, i, null);
    }

    @Override // org.apache.druid.testing.utils.StreamGenerator
    public long run(String str, StreamEventWriter streamEventWriter, int i, DateTime dateTime) {
        this.serializer.initialize(str);
        DateTime roundCeilingCopy = DateTimes.nowUtc().secondOfDay().roundCeilingCopy();
        DateTime dateTime2 = dateTime == null ? roundCeilingCopy : dateTime;
        int i2 = 0;
        long j = 0;
        while (true) {
            try {
                long millis = roundCeilingCopy.getMillis() - DateTimes.nowUtc().getMillis();
                if (millis > 0) {
                    LOG.info("Waiting %s ms for next run cycle (at %s)", new Object[]{Long.valueOf(millis), roundCeilingCopy});
                    Thread.sleep(millis);
                } else {
                    LOG.info("Beginning run cycle with %s events, target completion time: %s", new Object[]{Integer.valueOf(this.eventsPerSecond), roundCeilingCopy.plusSeconds(1).minus(this.cyclePaddingMs)});
                    if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) {
                        streamEventWriter.initTransaction();
                    }
                    for (int i3 = 1; i3 <= this.eventsPerSecond; i3++) {
                        streamEventWriter.write(str, this.serializer.serialize(newEvent(i3, dateTime2)));
                        j++;
                        long calculateSleepTimeMs = calculateSleepTimeMs(this.eventsPerSecond - i3, roundCeilingCopy);
                        if ((i3 <= 100 && i3 % 10 == 0) || i3 % 100 == 0) {
                            LOG.info("Event: %s/%s, sleep time: %s ms", new Object[]{Integer.valueOf(i3), Integer.valueOf(this.eventsPerSecond), Long.valueOf(calculateSleepTimeMs)});
                        }
                        if (calculateSleepTimeMs > 0) {
                            Thread.sleep(calculateSleepTimeMs);
                        }
                    }
                    if (streamEventWriter.supportTransaction() && streamEventWriter.isTransactionEnabled()) {
                        streamEventWriter.commitTransaction();
                    }
                    roundCeilingCopy = roundCeilingCopy.plusSeconds(1);
                    dateTime2 = dateTime2.plusSeconds(1);
                    i2++;
                    LOG.info("Finished writing %s events, current time: %s - updating next timestamp to: %s", new Object[]{Integer.valueOf(this.eventsPerSecond), DateTimes.nowUtc(), roundCeilingCopy});
                    if (i2 >= i) {
                        streamEventWriter.flush();
                        LOG.info("Finished writing %s seconds", new Object[]{Integer.valueOf(i2)});
                        return j;
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception in event generation loop", e);
            }
        }
    }

    private long calculateSleepTimeMs(long j, DateTime dateTime) {
        if (j == 0) {
            return 0L;
        }
        DateTime nowUtc = DateTimes.nowUtc();
        DateTime minus = dateTime.plusSeconds(1).minus(this.cyclePaddingMs);
        if (minus.isBefore(nowUtc)) {
            return 0L;
        }
        return (minus.getMillis() - nowUtc.getMillis()) / j;
    }
}
