/*
 * Decompiled with CFR 0.152.
 */
package ru.infon.queuebox;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import ru.infon.queuebox.MessageContainer;
import ru.infon.queuebox.QueueConsumer;
import ru.infon.queuebox.QueuePacketHolder;

class QueueConsumerThread<T> {
    private static final Log LOG = LogFactory.getLog(QueueConsumerThread.class);
    private static final int DEFAULT_FETCH_DELAY_MILLS = 100;
    private final ExecutorService executor;
    private final QueueConsumer<T> consumer;
    private final QueuePacketHolder<T> packetHolder;
    private final Semaphore semaphore;
    private final Timer timer;
    private final AtomicBoolean runningFlag = new AtomicBoolean(false);
    private int fetchDelayMills = 100;

    QueueConsumerThread(Properties properties, QueueConsumer<T> consumer, QueuePacketHolder<T> packetHolder, ExecutorService executor) {
        this.executor = executor;
        this.consumer = consumer;
        this.packetHolder = packetHolder;
        try {
            this.fetchDelayMills = Integer.parseInt(properties.getProperty("queue.fetch.delay.mills"));
        }
        catch (NullPointerException | NumberFormatException runtimeException) {
            // empty catch block
        }
        this.semaphore = new Semaphore(packetHolder.getFetchLimit());
        this.timer = new Timer("QCT_timer_" + consumer.getConsumerId());
    }

    void start() {
        LOG.info((Object)String.format("starting QueueConsumerThread for %s", this.consumer));
        this.runningFlag.set(true);
        this.executor.execute(() -> this.runTask(this::payload));
    }

    void stop() {
        this.runningFlag.set(false);
    }

    private Collection<MessageContainer<T>> payload() {
        try {
            return this.packetHolder.fetch(this.consumer);
        }
        catch (Throwable e) {
            LOG.debug((Object)e);
            return Collections.EMPTY_LIST;
        }
    }

    private void onComplete(Collection<MessageContainer<T>> result) {
        if (result.size() > 0) {
            LOG.info((Object)String.format("worker received %d events for consumer %s", result.size(), this.consumer.getConsumerId()));
        }
        if (result.size() == 0 && this.runningFlag.get()) {
            this.schedule(() -> this.runTask(this::payload), this.fetchDelayMills);
        } else {
            Iterator<MessageContainer<T>> it = result.iterator();
            while (!result.isEmpty()) {
                if (!it.hasNext()) {
                    it = result.iterator();
                }
                MessageContainer<T> packet = it.next();
                try {
                    this.semaphore.acquire();
                    this.executor.execute(() -> {
                        LOG.debug((Object)String.format("processing message %s with data: \"%s\"", packet.getId(), packet.getMessage()));
                        packet.setCallback(this.packetHolder::ack, this.packetHolder::reset);
                        if (!this.safePacketHandling(packet)) {
                            this.packetHolder.reset(packet);
                        }
                        this.semaphore.release();
                    });
                    it.remove();
                }
                catch (RejectedExecutionException rejected) {
                    LOG.warn((Object)String.format("task {%s} was rejected by threadpool ... will try again later", packet.getId()));
                    this.packetHolder.reset(packet);
                    it.remove();
                }
                catch (InterruptedException interrupted) {
                    LOG.warn((Object)String.format("task {%s} cannot be executed due to threads policy ... will try again later", packet.getId()));
                    this.packetHolder.reset(packet);
                    it.remove();
                }
            }
            LOG.info((Object)String.format("processing events done for %s", this.consumer));
            this.runTask(this::payload);
        }
    }

    private boolean safePacketHandling(MessageContainer<T> packet) {
        try {
            this.consumer.onPacket(packet);
            return true;
        }
        catch (Throwable t) {
            LOG.error((Object)String.format("task {%s} handling failed to to exception in consumer: %s... will try again later", packet.getId(), t.getMessage()), t);
            return false;
        }
    }

    private void runTask(Supplier<Collection<MessageContainer<T>>> payload) {
        if (this.runningFlag.get()) {
            CompletableFuture.supplyAsync(payload, this.executor).thenAccept(this::onComplete);
        }
    }

    private void schedule(Runnable runnable, long delay) {
        this.timer.schedule((TimerTask)new LambdaTimerTask(runnable), delay);
    }

    private static class LambdaTimerTask
    extends TimerTask {
        private final Runnable runnable;

        LambdaTimerTask(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override
        public void run() {
            this.runnable.run();
        }
    }
}

