/*
 * Decompiled with CFR 0.152.
 */
package tech.smartboot.mqtt.common;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.smartboot.socket.timer.Timer;
import tech.smartboot.mqtt.common.AbstractSession;
import tech.smartboot.mqtt.common.AsyncTask;
import tech.smartboot.mqtt.common.InflightMessage;
import tech.smartboot.mqtt.common.enums.MqttMessageType;
import tech.smartboot.mqtt.common.enums.MqttVersion;
import tech.smartboot.mqtt.common.exception.MqttException;
import tech.smartboot.mqtt.common.message.MessageBuilder;
import tech.smartboot.mqtt.common.message.MqttFixedHeader;
import tech.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import tech.smartboot.mqtt.common.message.MqttPubRelMessage;
import tech.smartboot.mqtt.common.message.MqttPublishMessage;
import tech.smartboot.mqtt.common.message.MqttSubscribeMessage;
import tech.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttSubscribeVariableHeader;
import tech.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import tech.smartboot.mqtt.common.util.ValidateUtils;

public class InflightQueue {
    public static final Runnable EMPTY_RUNNABLE = () -> {};
    private static final int TIMEOUT = 30;
    private final InflightMessage[] queue;
    private int takeIndex;
    private int putIndex;
    private volatile int count;
    private int packetId = 0;
    private final AbstractSession session;
    private final Timer timer;

    public InflightQueue(AbstractSession session, int size, Timer timer) {
        ValidateUtils.isTrue(size > 0, "inflight must >0");
        this.queue = new InflightMessage[size];
        this.session = session;
        this.timer = timer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> put(MessageBuilder publishBuilder) {
        boolean flush;
        InflightMessage inflightMessage;
        try {
            InflightQueue inflightQueue = this;
            synchronized (inflightQueue) {
                while (this.count == this.queue.length) {
                    this.wait();
                }
                inflightMessage = this.enqueue(publishBuilder);
                flush = this.count == this.queue.length;
            }
        }
        catch (Exception e) {
            throw new MqttException("put message into inflight queue exception", e);
        }
        this.session.write(inflightMessage.getOriginalMessage(), flush);
        return inflightMessage.getFuture();
    }

    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MessageBuilder publishBuilder) {
        return this.offer(publishBuilder, EMPTY_RUNNABLE);
    }

    public int available() {
        return this.queue.length - this.count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MessageBuilder publishBuilder, Runnable runnable) {
        boolean flush;
        InflightMessage inflightMessage = null;
        InflightQueue inflightQueue = this;
        synchronized (inflightQueue) {
            if (this.count == this.queue.length) {
                int i = this.putIndex - 1;
                if (i < 0) {
                    i = this.queue.length - 1;
                }
                this.queue[i].getFuture().thenRun(runnable);
            } else {
                inflightMessage = this.enqueue(publishBuilder);
            }
            flush = this.count == this.queue.length;
        }
        if (inflightMessage != null) {
            this.session.write(inflightMessage.getOriginalMessage(), flush);
            return inflightMessage.getFuture();
        }
        return null;
    }

    private InflightMessage enqueue(MessageBuilder publishBuilder) {
        int id;
        if ((id = ++this.packetId) > 65535) {
            this.packetId = id = id % this.queue.length + this.queue.length;
        }
        Object mqttMessage = publishBuilder.packetId(id).build();
        InflightMessage inflightMessage = new InflightMessage(id, (MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>)mqttMessage);
        this.queue[this.putIndex++] = inflightMessage;
        if (this.putIndex == this.queue.length) {
            this.putIndex = 0;
        }
        ++this.count;
        if (this.count == 1) {
            this.retry(inflightMessage);
        }
        return inflightMessage;
    }

    private void retry(final InflightMessage inflightMessage) {
        if (inflightMessage.isCommit() || this.session.isDisconnect()) {
            return;
        }
        this.timer.schedule((Runnable)new AsyncTask(){

            @Override
            public void execute() {
                if (inflightMessage.isCommit()) {
                    return;
                }
                if (((InflightQueue)InflightQueue.this).session.disconnect || ((InflightQueue)InflightQueue.this).session.session.isInvalid()) {
                    return;
                }
                long delay = TimeUnit.SECONDS.toMillis(30L) - System.currentTimeMillis() + inflightMessage.getLatestTime();
                if (delay > 0L) {
                    InflightQueue.this.timer.schedule((Runnable)this, delay, TimeUnit.MILLISECONDS);
                    return;
                }
                inflightMessage.setLatestTime(System.currentTimeMillis());
                switch (inflightMessage.getExpectMessageType()) {
                    case PUBACK: 
                    case PUBREC: {
                        MqttPublishMessage mqttMessage = (MqttPublishMessage)inflightMessage.getOriginalMessage();
                        MqttFixedHeader mqttFixedHeader = MqttFixedHeader.getInstance(mqttMessage.getFixedHeader().getMessageType(), true, mqttMessage.getFixedHeader().getQosLevel().value(), mqttMessage.getFixedHeader().isRetain());
                        MqttPublishMessage dupMessage = new MqttPublishMessage(mqttFixedHeader, (MqttPublishVariableHeader)mqttMessage.getVariableHeader(), mqttMessage.getPayload().getPayload());
                        InflightQueue.this.session.write(dupMessage);
                        break;
                    }
                    case PUBCOMP: {
                        ReasonProperties properties = null;
                        if (inflightMessage.getOriginalMessage().getVersion() == MqttVersion.MQTT_5) {
                            properties = new ReasonProperties();
                        }
                        MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader> message = inflightMessage.getOriginalMessage();
                        MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId(), properties);
                        MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, variableHeader);
                        InflightQueue.this.session.write(pubRelMessage);
                        break;
                    }
                    case SUBACK: {
                        MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage)inflightMessage.getOriginalMessage();
                        MqttSubscribeMessage dupSubscribeMessage = new MqttSubscribeMessage(MqttFixedHeader.SUBSCRIBE_HEADER_DUP, (MqttSubscribeVariableHeader)subscribeMessage.getVariableHeader(), subscribeMessage.getPayload());
                        InflightQueue.this.session.write(dupSubscribeMessage);
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("invalid message type: " + (Object)((Object)inflightMessage.getExpectMessageType()));
                    }
                }
                inflightMessage.setRetryCount(inflightMessage.getRetryCount() + 1);
                InflightQueue.this.timer.schedule((Runnable)this, 30L, TimeUnit.SECONDS);
            }
        }, TimeUnit.SECONDS.toMillis(30L) - (System.currentTimeMillis() - inflightMessage.getLatestTime()), TimeUnit.MILLISECONDS);
    }

    public void notify(MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader> message) {
        InflightMessage inflightMessage = this.queue[(((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId() - 1) % this.queue.length];
        if (inflightMessage == null) {
            return;
        }
        switch (message.getFixedHeader().getMessageType()) {
            case PUBACK: 
            case PUBCOMP: 
            case SUBACK: 
            case UNSUBACK: {
                if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || ((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId() != inflightMessage.getAssignedPacketId()) break;
                inflightMessage.setResponseMessage(message);
                inflightMessage.setLatestTime(System.currentTimeMillis());
                this.commit(inflightMessage);
                break;
            }
            case PUBREC: {
                if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || ((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId() != inflightMessage.getAssignedPacketId()) break;
                inflightMessage.setResponseMessage(message);
                inflightMessage.setLatestTime(System.currentTimeMillis());
                inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
                ReasonProperties properties = null;
                if (message.getVersion() == MqttVersion.MQTT_5) {
                    properties = new ReasonProperties();
                }
                MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId(), properties);
                MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER, variableHeader);
                this.session.write(pubRelMessage, false);
                break;
            }
            default: {
                throw new RuntimeException(message.toString());
            }
        }
    }

    private synchronized void commit(InflightMessage inflightMessage) {
        MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader> originalMessage = inflightMessage.getOriginalMessage();
        ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || ((MqttPacketIdVariableHeader)originalMessage.getVariableHeader()).getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message");
        inflightMessage.setCommit(true);
        if ((inflightMessage.getAssignedPacketId() - 1) % this.queue.length != this.takeIndex) {
            return;
        }
        if (this.count < this.queue.length) {
            this.notifyAll();
        }
        this.queue[this.takeIndex++] = null;
        --this.count;
        if (this.takeIndex == this.queue.length) {
            this.takeIndex = 0;
        }
        inflightMessage.getFuture().complete(inflightMessage.getResponseMessage());
        while (this.count > 0 && this.queue[this.takeIndex].isCommit()) {
            inflightMessage = this.queue[this.takeIndex];
            this.queue[this.takeIndex++] = null;
            if (this.takeIndex == this.queue.length) {
                this.takeIndex = 0;
            }
            --this.count;
            inflightMessage.getFuture().complete(inflightMessage.getResponseMessage());
        }
        if (this.count > 0) {
            InflightMessage monitorMessage = this.queue[this.takeIndex];
            this.session.retryRunnable = () -> this.session.getInflightQueue().retry(monitorMessage);
        }
    }
}

