package ru.infon.queuebox;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ru/infon/queuebox/QueueConsumerThread.class */
public class QueueConsumerThread<T> {
    private static final Log LOG = LogFactory.getLog(QueueConsumerThread.class);
    private static final int DEFAULT_FETCH_DELAY_MILLS = 100;
    private ExecutorService executor;
    private final QueueConsumer<T> consumer;
    private final QueuePacketHolder<T> packetHolder;
    private final Semaphore semaphore;
    private final Timer timer;
    private int fetchDelayMills;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/infon/queuebox/QueueConsumerThread$LambdaTimerTask.class */
    public static class LambdaTimerTask extends TimerTask {
        private Runnable runnable;

        LambdaTimerTask(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            this.runnable.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueConsumerThread(Properties properties, QueueConsumer<T> queueConsumer, QueuePacketHolder<T> queuePacketHolder, ExecutorService executorService) {
        this.fetchDelayMills = DEFAULT_FETCH_DELAY_MILLS;
        this.executor = executorService;
        this.consumer = queueConsumer;
        this.packetHolder = queuePacketHolder;
        try {
            this.fetchDelayMills = Integer.parseInt(properties.getProperty(QueueBox.PROPERTY_FETCH_DELAY_MILLS));
        } catch (NullPointerException | NumberFormatException e) {
        }
        this.semaphore = new Semaphore(queuePacketHolder.getFetchLimit());
        this.timer = new Timer("QCT_timer_" + queueConsumer.getConsumerId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        LOG.info(String.format("starting QueueConsumerThread for %s", this.consumer));
        this.executor.execute(() -> {
            runTask(this::payload);
        });
    }

    private Collection<MessageContainer<T>> payload() {
        try {
            return this.packetHolder.fetch(this.consumer);
        } catch (Throwable th) {
            LOG.debug(th);
            return Collections.EMPTY_LIST;
        }
    }

    private void onComplete(Collection<MessageContainer<T>> collection) {
        if (collection.size() > 0) {
            LOG.info(String.format("worker received %d events for consumer %s", Integer.valueOf(collection.size()), this.consumer.getConsumerId()));
        }
        if (collection.size() == 0) {
            schedule(() -> {
                runTask(this::payload);
            }, this.fetchDelayMills);
            return;
        }
        Iterator<MessageContainer<T>> it = collection.iterator();
        while (!collection.isEmpty()) {
            if (!it.hasNext()) {
                it = collection.iterator();
            }
            MessageContainer<T> next = it.next();
            try {
                this.semaphore.acquire();
                this.executor.execute(() -> {
                    LOG.debug(String.format("processing message %s with data: \"%s\"", next.getId(), next.getMessage()));
                    QueuePacketHolder<T> queuePacketHolder = this.packetHolder;
                    Objects.requireNonNull(queuePacketHolder);
                    Consumer<MessageContainer<T>> consumer = queuePacketHolder::ack;
                    QueuePacketHolder<T> queuePacketHolder2 = this.packetHolder;
                    Objects.requireNonNull(queuePacketHolder2);
                    next.setCallback(consumer, queuePacketHolder2::reset);
                    this.consumer.onPacket(next);
                    this.semaphore.release();
                });
                it.remove();
            } catch (InterruptedException e) {
                LOG.warn(String.format("task {%s} cannot be executed due to threads policy ... trying again later", next.getId()));
                this.packetHolder.reset(next);
                it.remove();
            } catch (RejectedExecutionException e2) {
                LOG.warn(String.format("task {%s} was rejected by threadpool ... trying again later", next.getId()));
                this.packetHolder.reset(next);
                it.remove();
            }
        }
        LOG.info(String.format("processing events done for %s", this.consumer));
        runTask(this::payload);
    }

    private void runTask(Supplier<Collection<MessageContainer<T>>> supplier) {
        CompletableFuture.supplyAsync(supplier, this.executor).thenAccept((Consumer) this::onComplete);
    }

    private void schedule(Runnable runnable, long j) {
        this.timer.schedule(new LambdaTimerTask(runnable), j);
    }
}
