package ru.qatools.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ru/qatools/mongodb/MongoTailableQueue.class */
public class MongoTailableQueue<T> extends MongoAbstractStorage<T> implements TailableQueue<T> {
    public static final long DEFAULT_MAX_SIZE = 1000;
    public static final long ASSUMED_MAX_DOC_SIZE = 1048576;
    public static final int BATCH_SIZE = 100;
    public static final int SLEEP_BETWEEN_FAILURES_MS = 500;
    public static final int MIN_POLL_INTERVAL_MS = 100;
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoTailableQueue.class);
    final MongoClient mongo;
    final String dbName;
    final String queueName;
    final long maxSize;
    volatile boolean stopped;
    private int minPollIntervalMs;
    private int sleepBetweenFailuresMs;

    public MongoTailableQueue(Class<T> cls, MongoClient mongoClient, String str, String str2) {
        this(cls, mongoClient, str, str2, 1000L);
    }

    public MongoTailableQueue(Class<T> cls, MongoClient mongoClient, String str, String str2, long j) {
        super(cls);
        this.stopped = false;
        this.minPollIntervalMs = 100;
        this.sleepBetweenFailuresMs = SLEEP_BETWEEN_FAILURES_MS;
        this.mongo = mongoClient;
        this.dbName = str;
        this.queueName = str2;
        this.maxSize = j;
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public void drop() {
        collection().drop();
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public void init() {
        if (StreamSupport.stream(db().listCollectionNames().spliterator(), false).filter(str -> {
            return str.equals(this.queueName);
        }).count() == 0) {
            db().createCollection(this.queueName, new CreateCollectionOptions().capped(true).maxDocuments(this.maxSize).sizeInBytes(ASSUMED_MAX_DOC_SIZE * this.maxSize).autoIndex(true));
        }
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public void stop() {
        this.stopped = true;
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public void poll(Consumer<T> consumer) {
        if (this.stopped) {
            LOGGER.warn("Could not stopped queue {}.{}", this.dbName, this.queueName);
        }
        while (!this.stopped) {
            try {
                StreamSupport.stream(collection().find().cursorType(CursorType.TailableAwait).noCursorTimeout(true).batchSize(100).spliterator(), false).forEach(document -> {
                    consumer.accept(getObject(document, this.entityClass));
                });
                try {
                    LOGGER.debug("Tailable cursor returned no value without await for {}.{}", this.dbName, this.queueName);
                    Thread.sleep(this.minPollIntervalMs);
                } catch (InterruptedException e) {
                    LOGGER.warn("Poll sleeping after cursor returns was interrupted", e);
                }
            } catch (MongoException e2) {
                LOGGER.debug("Failed to iterate on queue cursor for {}.{}", new Object[]{this.dbName, this.queueName, e2});
                try {
                    Thread.sleep(this.sleepBetweenFailuresMs);
                } catch (InterruptedException e3) {
                    LOGGER.warn("Poll sleeping after cursor failure was interrupted", e2);
                }
            }
        }
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public void add(T t) {
        collection().insertOne(Document.parse(this.serializer.toDBObject(t).toJson()));
    }

    @Override // ru.qatools.mongodb.TailableQueue
    public long size() {
        return collection().count();
    }

    public void setMinPollIntervalMs(int i) {
        this.minPollIntervalMs = i;
    }

    public void setSleepBetweenFailuresMs(int i) {
        this.sleepBetweenFailuresMs = i;
    }

    private MongoCollection<Document> collection() {
        return db().getCollection(this.queueName);
    }

    private MongoDatabase db() {
        return this.mongo.getDatabase(this.dbName);
    }
}
