package org.apache.flink.streaming.connectors.kinesis.util;

import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.class */
public abstract class RecordEmitter<T extends TimestampedValue> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class);
    public static final int DEFAULT_QUEUE_CAPACITY = 100;
    private final int queueCapacity;
    private final ConcurrentHashMap<Integer, RecordEmitter<T>.AsyncRecordQueue<T>> queues = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<RecordEmitter<T>.AsyncRecordQueue<T>, Boolean> emptyQueues = new ConcurrentHashMap<>();
    private final PriorityQueue<RecordEmitter<T>.AsyncRecordQueue<T>> heads = new PriorityQueue<>(this::compareHeadElement);
    private volatile boolean running = true;
    private volatile long maxEmitTimestamp = Long.MAX_VALUE;
    private long maxLookaheadMillis = 60000;
    private long idleSleepMillis = 100;
    private final Object condition = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter$AsyncRecordQueue.class */
    public class AsyncRecordQueue<T> implements RecordQueue<T> {
        private final ArrayBlockingQueue<T> queue;
        private final int queueId;
        long headTimestamp;

        private AsyncRecordQueue(int i) {
            this.queue = new ArrayBlockingQueue<>(RecordEmitter.this.queueCapacity);
            this.queueId = i;
            this.headTimestamp = Long.MAX_VALUE;
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
        public void put(T t) throws InterruptedException {
            this.queue.put(t);
            synchronized (RecordEmitter.this.condition) {
                RecordEmitter.this.condition.notify();
            }
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
        public int getSize() {
            return this.queue.size();
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
        public T peek() {
            return this.queue.peek();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter$RecordQueue.class */
    public interface RecordQueue<T> {
        void put(T t) throws InterruptedException;

        int getSize();

        T peek();
    }

    public RecordEmitter(int i) {
        this.queueCapacity = i;
    }

    private int compareHeadElement(AsyncRecordQueue asyncRecordQueue, AsyncRecordQueue asyncRecordQueue2) {
        return Long.compare(asyncRecordQueue.headTimestamp, asyncRecordQueue2.headTimestamp);
    }

    public RecordQueue<T> getQueue(int i) {
        return this.queues.computeIfAbsent(Integer.valueOf(i), num -> {
            RecordEmitter<T>.AsyncRecordQueue<T> asyncRecordQueue = new AsyncRecordQueue<>(i);
            this.emptyQueues.put(asyncRecordQueue, false);
            return asyncRecordQueue;
        });
    }

    public void setMaxLookaheadMillis(long j) {
        this.maxLookaheadMillis = j;
        LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to {}", Long.valueOf(j));
    }

    public void setCurrentWatermark(long j) {
        this.maxEmitTimestamp = j + this.maxLookaheadMillis;
        synchronized (this.condition) {
            this.condition.notify();
        }
        LOG.info("[setCurrentWatermark] Current watermark set to {}, maxEmitTimestamp = {}", Long.valueOf(j), Long.valueOf(this.maxEmitTimestamp));
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting emitter with maxLookaheadMillis: {}", Long.valueOf(this.maxLookaheadMillis));
        emitRecords();
    }

    public void stop() {
        this.running = false;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void emitRecords() {
        TimestampedValue timestampedValue;
        RecordEmitter<T>.AsyncRecordQueue<T> poll = this.heads.poll();
        while (this.running) {
            while (poll == null) {
                if (!this.emptyQueues.isEmpty()) {
                    Iterator it = this.emptyQueues.keySet().iterator();
                    while (it.hasNext()) {
                        RecordEmitter<T>.AsyncRecordQueue<T> asyncRecordQueue = (AsyncRecordQueue) it.next();
                        if (!((AsyncRecordQueue) asyncRecordQueue).queue.isEmpty()) {
                            this.emptyQueues.remove(asyncRecordQueue);
                            asyncRecordQueue.headTimestamp = ((TimestampedValue) ((AsyncRecordQueue) asyncRecordQueue).queue.peek()).getTimestamp();
                            this.heads.offer(asyncRecordQueue);
                        }
                    }
                }
                poll = this.heads.poll();
                if (poll == null) {
                    synchronized (this.condition) {
                        try {
                            this.condition.wait(this.idleSleepMillis);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
            while (true) {
                if (poll.headTimestamp > this.maxEmitTimestamp) {
                    synchronized (this.condition) {
                        try {
                            this.condition.wait(this.idleSleepMillis);
                            if (poll.headTimestamp > this.maxEmitTimestamp && !this.emptyQueues.isEmpty()) {
                                this.heads.offer(poll);
                                poll = null;
                            }
                        } catch (InterruptedException e2) {
                        }
                    }
                    break;
                }
                RecordEmitter<T>.AsyncRecordQueue<T> poll2 = this.heads.poll();
                int i = 0;
                while (true) {
                    timestampedValue = (TimestampedValue) ((AsyncRecordQueue) poll).queue.poll();
                    if (timestampedValue == null) {
                        break;
                    }
                    emit(timestampedValue, poll);
                    poll.headTimestamp = timestampedValue.getTimestamp();
                    if ((poll2 != null && poll.headTimestamp > poll2.headTimestamp) || poll.headTimestamp > this.maxEmitTimestamp) {
                        break;
                    }
                    int i2 = i;
                    i++;
                    if (i2 > this.queueCapacity && !this.emptyQueues.isEmpty()) {
                        break;
                    }
                }
                if (timestampedValue == null) {
                    this.emptyQueues.put(poll, true);
                } else if (poll2 == null || poll2.headTimestamp <= poll.headTimestamp) {
                    this.heads.offer(poll);
                } else {
                    this.heads.offer(poll2);
                    poll2 = poll;
                }
                poll = poll2;
            }
        }
    }

    protected abstract void emit(T t, RecordQueue<T> recordQueue);

    public String printInfo() {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append(String.format("queues: %d, empty: %d", Integer.valueOf(this.queues.size()), Integer.valueOf(this.emptyQueues.size())));
        for (Map.Entry<Integer, RecordEmitter<T>.AsyncRecordQueue<T>> entry : this.queues.entrySet()) {
            RecordEmitter<T>.AsyncRecordQueue<T> value = entry.getValue();
            stringBuffer.append(String.format("\n%d timestamp: %s size: %d", Integer.valueOf(((AsyncRecordQueue) entry.getValue()).queueId), Long.valueOf(value.headTimestamp), Integer.valueOf(((AsyncRecordQueue) value).queue.size())));
        }
        return stringBuffer.toString();
    }
}
