/*
 * Decompiled with CFR 0.152.
 */
package ru.infon.queuebox.mongo;

import com.mongodb.client.MongoCollection;
import gaillard.mongo.MongoQueueCore;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import net.c0f3.queuebox.QueueBoxContext;
import org.bson.Document;
import ru.infon.queuebox.MessageContainer;
import ru.infon.queuebox.QueueBehave;
import ru.infon.queuebox.QueueConsumer;
import ru.infon.queuebox.QueueSerializer;
import ru.infon.queuebox.RoutedMessage;
import ru.infon.queuebox.common.PropertiesBox;
import ru.infon.queuebox.mongo.MongoJacksonSerializer;

public class MongoRoutedQueueBehave<T extends RoutedMessage>
implements QueueBehave<T> {
    private static final String FIELD_SOURCE = "source";
    private static final String FIELD_DESTINATION = "destination";
    private static final String FIELD_ID = "id";
    public static final String PROPERTY_FETCH_LIMIT = "queue.fetch.limit";
    public static final String PROPERTY_RESET_TIMEOUT = "queue.message.timeout";
    private static final int DEFAULT_FETCH_LIMIT = 100;
    private static final int DEFAULT_RESET_TIMEOUT_SEC = 300;
    private static final int RETRY_IMMEDIATELY = 0;
    private static final int NO_WAIT_FOR_RETRY = 0;
    public static final String STAT_FIND_COUNTER = "finds-counter";
    private final QueueSerializer<T> serializer;
    private final MongoQueueCore mongoQueueCore;
    private QueueBoxContext context;
    private int fetchLimit = 100;
    private int resetTimeout = 300;

    public MongoRoutedQueueBehave(MongoCollection<Document> collection, PropertiesBox properties, Class<T> packetClass) {
        this.serializer = new MongoJacksonSerializer<T>(packetClass);
        this.mongoQueueCore = new MongoQueueCore(collection);
        Document indexDocument = new Document();
        indexDocument.append(FIELD_DESTINATION, (Object)1);
        this.mongoQueueCore.ensureGetIndex(indexDocument);
        this.fetchLimit = properties.tryGetIntProperty(PROPERTY_FETCH_LIMIT, 100);
        this.resetTimeout = properties.tryGetIntProperty(PROPERTY_RESET_TIMEOUT, 300);
    }

    @Override
    public void setContext(QueueBoxContext context) {
        this.context = context;
    }

    @Override
    public int getFetchLimit() {
        return this.fetchLimit;
    }

    @Override
    public void put(MessageContainer<T> event) {
        RoutedMessage message = (RoutedMessage)event.getMessage();
        Document queueMessage = this.serializer.serialize(message);
        queueMessage.append(FIELD_SOURCE, (Object)message.getSource());
        queueMessage.append(FIELD_DESTINATION, (Object)message.getDestination());
        this.mongoQueueCore.send(this.serializer.serialize((RoutedMessage)event.getMessage()), new Date(), event.getPriority());
    }

    @Override
    public Collection<MessageContainer<T>> find(QueueConsumer<T> consumer) {
        Document query = new Document();
        query.append(FIELD_DESTINATION, (Object)consumer.getConsumerId());
        LinkedList<MessageContainer<T>> resultList = new LinkedList<MessageContainer<T>>();
        int limit = this.fetchLimit;
        while (limit-- > 0) {
            Document queueMessage = this.mongoQueueCore.get(query, this.resetTimeout, 0, 0L);
            this.context.getStatistic().increment(STAT_FIND_COUNTER);
            if (queueMessage == null) break;
            Object id = queueMessage.get((Object)FIELD_ID);
            queueMessage.remove((Object)FIELD_ID);
            String destination = queueMessage.getString((Object)FIELD_DESTINATION);
            String source = queueMessage.getString((Object)FIELD_SOURCE);
            queueMessage.remove((Object)FIELD_DESTINATION);
            queueMessage.remove((Object)FIELD_SOURCE);
            RoutedMessage message = (RoutedMessage)this.serializer.deserialize(queueMessage);
            message.setSource(source);
            message.setDestination(destination);
            MessageContainer<RoutedMessage> messageContainer = new MessageContainer<RoutedMessage>(message);
            messageContainer.setId(id);
            resultList.add(messageContainer);
        }
        return resultList;
    }

    @Override
    public void remove(MessageContainer<T> packet) {
        Document query = new Document();
        query.append(FIELD_ID, packet.getId());
        this.mongoQueueCore.ack(query);
    }

    @Override
    public void reset(MessageContainer<T> event) {
        RoutedMessage message = (RoutedMessage)event.getMessage();
        Document queueMessage = this.serializer.serialize(message);
        queueMessage.append(FIELD_SOURCE, (Object)message.getSource());
        queueMessage.append(FIELD_DESTINATION, (Object)message.getDestination());
        queueMessage.append(FIELD_ID, event.getId());
        this.mongoQueueCore.requeue(queueMessage);
    }
}

