/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source;

import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.connector.pulsar.source.StopCondition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionReader
implements Comparable<PartitionReader>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionReader.class);
    private static final long MAX_BACKOFF = 0x40000000L;
    private final PulsarPartitionSplit split;
    private final ConsumerImpl<byte[]> consumer;
    private final StopCondition stopCondition;
    @Nullable
    private Message lastMessage;
    private long backOff = 1L;
    private boolean stopped;

    public PartitionReader(PulsarPartitionSplit split, ConsumerImpl<byte[]> consumer, StopCondition stopCondition) {
        this.split = split;
        this.consumer = consumer;
        this.stopCondition = stopCondition;
    }

    public PulsarPartitionSplit getSplit() {
        return this.split;
    }

    @Nullable
    public Message getLastMessage() {
        return this.lastMessage;
    }

    public void setLastMessage(@Nullable Message lastMessage) {
        this.lastMessage = lastMessage;
    }

    public Iterator<Message<?>> nextBatch() throws PulsarClientException {
        Messages messages;
        Iterator messageIterator;
        if (this.consumer.hasMessageAvailable() && (messageIterator = (messages = this.consumer.batchReceive()).iterator()).hasNext()) {
            this.backOff = 1L;
            return new Iterator<Message<?>>(){
                @Nullable
                Message<byte[]> next = this.initNext();

                @Override
                public boolean hasNext() {
                    return this.next != null;
                }

                @Override
                public Message<?> next() {
                    PartitionReader.this.lastMessage = this.next;
                    this.next = this.initNext();
                    return PartitionReader.this.lastMessage;
                }

                @Nullable
                private Message<byte[]> initNext() {
                    if (!messageIterator.hasNext()) {
                        return null;
                    }
                    Message next = (Message)messageIterator.next();
                    switch (PartitionReader.this.stopCondition.shouldStop(PartitionReader.this.split.getPartition(), next)) {
                        case STOP_BEFORE: {
                            PartitionReader.this.stopped = true;
                            return null;
                        }
                        case STOP_AFTER: {
                            PartitionReader.this.stopped = true;
                            return next;
                        }
                    }
                    return next;
                }
            };
        }
        if (this.backOff < 0x40000000L) {
            this.backOff <<= 1;
        }
        return Collections.emptyIterator();
    }

    public boolean isStopped() {
        return this.stopped;
    }

    @Override
    public int compareTo(PartitionReader o) {
        return Long.compare(this.getOrder(), o.getOrder());
    }

    private long getOrder() {
        return this.lastMessage == null ? this.backOff : this.lastMessage.getEventTime() + this.backOff;
    }

    @Override
    public void close() {
        this.consumer.closeAsync().whenComplete((dummy, e) -> {
            if (e != null) {
                LOG.warn("Error while closing reader for " + this.split, e);
            }
        });
    }
}

