/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source;

import java.io.Serializable;
import java.util.Comparator;
import java.util.Map;
import org.apache.flink.connector.pulsar.source.AbstractPartition;
import org.apache.flink.connector.pulsar.source.LastStopCondition;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;

public interface StopCondition
extends Serializable {
    public static final Comparator<MessageId> NON_BATCH_COMPARATOR = new Comparator<MessageId>(){
        final Comparator<MessageIdImpl> implComparator = Comparator.comparingLong(MessageIdImpl::getLedgerId).thenComparingLong(MessageIdImpl::getEntryId).thenComparingInt(MessageIdImpl::getPartitionIndex);

        @Override
        public int compare(MessageId o1, MessageId o2) {
            return this.implComparator.compare((MessageIdImpl)o1, (MessageIdImpl)o2);
        }
    };

    public StopResult shouldStop(AbstractPartition var1, Message<?> var2);

    default public void init(AbstractPartition partition, Consumer<byte[]> consumer) throws PulsarClientException {
    }

    public static StopCondition stopAtMessageId(MessageId id) {
        return (partition, message) -> StopCondition.hitMessageId(message, id) ? StopResult.STOP_BEFORE : StopResult.DONT_STOP;
    }

    public static boolean hitMessageId(Message<?> message, MessageId id) {
        return NON_BATCH_COMPARATOR.compare(message.getMessageId(), id) >= 0;
    }

    public static StopCondition stopAfterMessageId(MessageId id) {
        return (partition, message) -> StopCondition.hitMessageId(message, id) ? StopResult.STOP_AFTER : StopResult.DONT_STOP;
    }

    public static StopCondition stopAtMessageIds(Map<AbstractPartition, MessageId> ids) {
        return (partition, message) -> StopCondition.hitMessageId(message, (MessageId)ids.get(partition)) ? StopResult.STOP_BEFORE : StopResult.DONT_STOP;
    }

    public static StopCondition stopAfterMessageIds(Map<AbstractPartition, MessageId> ids) {
        return (partition, message) -> StopCondition.hitMessageId(message, (MessageId)ids.get(partition)) ? StopResult.STOP_AFTER : StopResult.DONT_STOP;
    }

    public static StopCondition stopAtTimestamp(long timestamp) {
        return (partition, message) -> message.getEventTime() >= timestamp ? StopResult.STOP_BEFORE : StopResult.DONT_STOP;
    }

    public static StopCondition stopAfterTimestamp(long timestamp) {
        return (partition, message) -> message.getEventTime() >= timestamp ? StopResult.STOP_AFTER : StopResult.DONT_STOP;
    }

    public static StopCondition stopAtLast() {
        return new LastStopCondition(){

            @Override
            public StopResult shouldStop(AbstractPartition partition, Message<?> message) {
                return this.lastId == null || StopCondition.hitMessageId(message, this.lastId) ? StopResult.STOP_BEFORE : StopResult.DONT_STOP;
            }
        };
    }

    public static StopCondition stopAfterLast() {
        return new LastStopCondition(){

            @Override
            public StopResult shouldStop(AbstractPartition partition, Message<?> message) {
                if (this.lastId == null) {
                    return StopResult.STOP_BEFORE;
                }
                return StopCondition.hitMessageId(message, this.lastId) ? StopResult.STOP_AFTER : StopResult.DONT_STOP;
            }
        };
    }

    public static StopCondition never() {
        return (partition, message) -> StopResult.DONT_STOP;
    }

    public static enum StopResult {
        STOP_BEFORE,
        STOP_AFTER,
        DONT_STOP;

    }
}

