/*
 * Decompiled with CFR 0.152.
 */
package net.morimekta.providence.mio;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.descriptor.PField;
import net.morimekta.providence.mio.MessageWriter;
import net.morimekta.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedMessageWriter
implements MessageWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueuedMessageWriter.class);
    private static final int DEFAULT_MAX_QUEUE_LEN = 65536;
    private final Queue<PMessage> messageQueue;
    private final Queue<PServiceCall> callQueue;
    private final ExecutorService executor;
    private final MessageWriter writer;
    private final int maxQueueLength;

    public QueuedMessageWriter(MessageWriter writer) {
        this(writer, Executors.newSingleThreadExecutor((ThreadFactory)NamedThreadFactory.builder().setDaemon(true).setNameFormat("providence-queued-writer").build()));
    }

    public QueuedMessageWriter(MessageWriter writer, ExecutorService executor) {
        this(writer, executor, 65536);
    }

    public QueuedMessageWriter(MessageWriter writer, ExecutorService executor, int maxQueueLength) {
        this.writer = writer;
        this.executor = executor;
        this.maxQueueLength = maxQueueLength;
        this.messageQueue = new ConcurrentLinkedQueue<PMessage>();
        this.callQueue = new ConcurrentLinkedQueue<PServiceCall>();
        this.executor.submit(this::writeLoop);
    }

    public int size() {
        return this.callQueue.size() + this.messageQueue.size();
    }

    @Override
    public <Message extends PMessage<Message, Field>, Field extends PField> int write(Message message) throws IOException {
        if (this.maxQueueLength <= 0 || this.size() < this.maxQueueLength) {
            this.messageQueue.offer(message);
            return 1;
        }
        return 0;
    }

    @Override
    public <Message extends PMessage<Message, Field>, Field extends PField> int write(PServiceCall<Message, Field> call) throws IOException {
        if (this.maxQueueLength <= 0 || this.size() < this.maxQueueLength) {
            this.callQueue.offer(call);
            return 1;
        }
        return 0;
    }

    @Override
    public int separator() throws IOException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        if (!this.executor.isShutdown()) {
            try {
                this.executor.shutdown();
                if (!this.executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                LOGGER.error("Interrupted while stopping writer loop thread", (Throwable)e);
                throw new RuntimeException(e.getMessage(), e);
            }
            finally {
                try {
                    while (this.messageQueue.size() > 0) {
                        this.writer.write(this.messageQueue.poll());
                        this.writer.separator();
                    }
                    while (this.callQueue.size() > 0) {
                        this.writer.write(this.callQueue.poll());
                        this.writer.separator();
                    }
                }
                catch (IOException e) {
                    LOGGER.error("Unable to write messages on close", (Throwable)e);
                }
                this.writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeLoop() {
        try {
            long failDelay = 137L;
            while (!this.executor.isShutdown()) {
                try {
                    while (this.messageQueue.size() > 0) {
                        this.writer.write(this.messageQueue.poll());
                        failDelay = 137L;
                    }
                    while (this.callQueue.size() > 0) {
                        this.writer.write(this.callQueue.poll());
                        failDelay = 137L;
                    }
                    this.sleep(3L);
                }
                catch (IOException e) {
                    if (failDelay >= 10000L) {
                        LOGGER.error("Unable to write message, sleeping {}s", (Object)(failDelay / 1000L), (Object)e);
                    } else {
                        LOGGER.error("Unable to write message, sleeping {}ms", (Object)failDelay, (Object)e);
                    }
                    try {
                        this.sleep(failDelay);
                    }
                    finally {
                        failDelay = Math.min(TimeUnit.MINUTES.toMillis(10L), (long)((double)failDelay * 1.66666667));
                    }
                }
            }
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
    }

    protected void sleep(long ms) throws InterruptedException {
        Thread.sleep(ms);
    }
}

