/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.fastcast.impl;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.nustaq.fastcast.api.FCPublisher;
import org.nustaq.fastcast.config.PhysicalTransportConf;
import org.nustaq.fastcast.config.PublisherConf;
import org.nustaq.fastcast.impl.BatchingController;
import org.nustaq.fastcast.impl.ControlPacket;
import org.nustaq.fastcast.impl.DataPacket;
import org.nustaq.fastcast.impl.RetransEntry;
import org.nustaq.fastcast.impl.RetransPacket;
import org.nustaq.fastcast.impl.Topic;
import org.nustaq.fastcast.transport.PhysicalTransport;
import org.nustaq.fastcast.util.FCLog;
import org.nustaq.offheap.bytez.ByteSource;
import org.nustaq.offheap.bytez.Bytez;
import org.nustaq.offheap.bytez.BytezAllocator;
import org.nustaq.offheap.bytez.malloc.MallocBytez;
import org.nustaq.offheap.bytez.malloc.MallocBytezAllocator;
import org.nustaq.offheap.bytez.onheap.HeapBytez;
import org.nustaq.offheap.structs.FSTStruct;
import org.nustaq.offheap.structs.FSTStructAllocator;
import org.nustaq.offheap.structs.structtypes.StructArray;
import org.nustaq.serialization.util.FSTUtil;

public class PacketSendBuffer
implements FCPublisher {
    public static boolean RETRANSDEBUG = false;
    public static final String KEEP_SUBS_NODEID = "KEEPRECEIVER";
    public static boolean DEBUG_LAT = false;
    public static int RETRANS_MEM = 10000;
    private static final int TAG_BUFF = 4;
    final PhysicalTransport trans;
    FSTStructAllocator offheapAllocator;
    FSTStructAllocator heapAllocator;
    ConcurrentLinkedQueue<RetransPacket> retransRequests = new ConcurrentLinkedQueue();
    String nodeId;
    int payMaxLen;
    int currentAvail;
    int topic;
    FSTStruct currentPacketBytePointer;
    long currentSequence = 1L;
    long nextSendMsg = 1L;
    StructArray<DataPacket> history;
    int historySize;
    ControlPacket dropMsg;
    DataPacket template;
    ByteBuffer tmpSend;
    Topic topicEntry;
    boolean isUnordered;
    BatchingController batchController;
    String currentReceiver = null;
    Bytez heartbeat;
    boolean batchOnLimit = true;
    long[] sentRetransSeq = new long[RETRANS_MEM];
    long[] sentRetransTimes = new long[RETRANS_MEM];
    int maxRetransAge = 0;
    int suppressedRetransCount = 0;
    ThreadLocal<byte[]> msgBytes = new ThreadLocal();
    AtomicBoolean sendLock = new AtomicBoolean(false);
    long hbInvtervalMS = 1000L;
    public volatile long lastMsgFlush = System.currentTimeMillis();
    HeapBytez hp = new HeapBytez(null, 0L, 0L);

    public PacketSendBuffer(PhysicalTransport trans, String nodeId, Topic entry) {
        this.trans = trans;
        this.topic = entry.getTopicId();
        this.topicEntry = entry;
        this.nodeId = nodeId;
        this.hbInvtervalMS = entry.getPublisherConf().getHeartbeatInterval();
        FCLog.log("init send buffer for topic " + entry.getTopicId());
        PhysicalTransportConf conf = trans.getConf();
        this.template = DataPacket.getTemplate(conf.getDgramsize());
        this.payMaxLen = this.template.data.length;
        this.template.getSender().setString(nodeId);
        this.template.setTopic(this.topic);
        this.offheapAllocator = new FSTStructAllocator(0, (BytezAllocator)new MallocBytezAllocator());
        this.heapAllocator = new FSTStructAllocator(0);
        PublisherConf publisherConf = this.topicEntry.getPublisherConf();
        int hSize = publisherConf.getNumPacketHistory();
        if ((long)hSize * (long)conf.getDgramsize() > (long)(Integer.MAX_VALUE - 2 * conf.getDgramsize())) {
            int newHist = (Integer.MAX_VALUE - 2 * conf.getDgramsize()) / conf.getDgramsize();
            publisherConf.numPacketHistory(newHist);
            FCLog.get().warn("int overflow, degrading history size from " + hSize + " to " + newHist);
            hSize = newHist;
        }
        this.history = this.offheapAllocator.newArray(hSize, (FSTStruct)this.template);
        FCLog.log("allocating send buffer for topic " + this.topicEntry.getTopicId() + " of " + this.history.getByteSize() / 1024 / 1024 + " MByte");
        this.historySize = this.history.size();
        this.setUnordered(this.topicEntry.isUnordered());
        this.initDropMsgPacket(nodeId);
        DataPacket curP = this.getPacketAt(this.currentSequence);
        this.currentPacketBytePointer = curP.detach();
        curP.setSeqNo(this.currentSequence);
        curP.dataPointer(this.currentPacketBytePointer);
        this.currentAvail = this.payMaxLen - 4;
        try {
            this.initTmpBBuf();
        }
        catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
        catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        this.setPacketRateLimit(publisherConf.getPps());
        this.heartbeat = new HeapBytez(new byte[]{1});
        this.flush();
    }

    private void initTmpBBuf() throws NoSuchFieldException, IllegalAccessException {
        this.tmpSend = ByteBuffer.allocateDirect(0);
        Field address = null;
        Field capacity = null;
        ArrayList fields = new ArrayList();
        FSTUtil.getAllFields(fields, this.tmpSend.getClass());
        for (int i = 0; i < fields.size(); ++i) {
            Field field = (Field)fields.get(i);
            if (field.getName().equals("address")) {
                address = field;
                continue;
            }
            if (!field.getName().equals("capacity")) continue;
            capacity = field;
        }
        address.setAccessible(true);
        capacity.setAccessible(true);
        MallocBytez base = (MallocBytez)this.history.getBase();
        address.setLong(this.tmpSend, base.getBaseAdress() + this.history.getOffset());
        capacity.setInt(this.tmpSend, this.history.getByteSize());
    }

    private void moveBuff(DataPacket packet) {
        this.tmpSend.limit((int)(packet.getOffset() + (long)packet.getDGramSize()));
        this.tmpSend.position((int)packet.getOffset());
    }

    protected void initDropMsgPacket(String nodeId) {
        this.dropMsg = new ControlPacket();
        this.dropMsg.getSender().setString(nodeId);
        this.dropMsg.setTopic(this.topic);
        this.dropMsg.setType((short)0);
        this.dropMsg = (ControlPacket)this.heapAllocator.newStruct((FSTStruct)this.dropMsg);
    }

    public void free() {
        this.offheapAllocator.free();
    }

    public Topic getTopicEntry() {
        return this.topicEntry;
    }

    public boolean isUnordered() {
        return this.isUnordered;
    }

    public void setUnordered(boolean unordered) {
        this.isUnordered = unordered;
    }

    private DataPacket getPacketAt(long seq) {
        return (DataPacket)this.history.get(this.getIndexFromSequence(seq));
    }

    private int getIndexFromSequence(long seq) {
        return (int)(seq % (long)this.historySize);
    }

    private boolean putMessage(int tag, ByteSource b, long offset, int len, boolean tryPut) {
        this.putMessageRecursive(tag, b, offset, len);
        return true;
    }

    private void putMessageRecursive(int tag, ByteSource b, long offset, int len) {
        while (true) {
            if (this.currentAvail > len + 4 + 2) {
                this.putInternal(tag, (short)1, b, offset, len);
                return;
            }
            if (this.isUnordered()) {
                if (len > this.payMaxLen - 4 - 2) {
                    throw new RuntimeException("unordered message size must not exceed packet size");
                }
                this.fire();
                this.putMessageRecursive(tag, b, offset, len);
                return;
            }
            int sendlen = this.currentAvail - 4 - 8;
            if (sendlen <= 8) {
                this.fire();
                continue;
            }
            this.putInternal(tag, (short)2, b, offset, sendlen);
            this.fire();
            tag = -1;
            offset += (long)sendlen;
            len -= sendlen;
        }
    }

    private void putInternal(int tag, short code, ByteSource b, long offset, int len) {
        int off = 0;
        if (tag >= 0) {
            off = 1;
        }
        this.currentPacketBytePointer.setShort(code);
        this.currentPacketBytePointer.next(2);
        this.currentPacketBytePointer.setShort((short)(len + off));
        this.currentPacketBytePointer.next(2);
        if (tag >= 0) {
            this.currentPacketBytePointer.setByte((byte)tag);
            this.currentPacketBytePointer.next(off);
        }
        this.currentPacketBytePointer.setBytes(b, offset, len);
        this.currentPacketBytePointer.next(len);
        this.currentAvail -= len + off + 4;
    }

    private void fire() {
        if (DEBUG_LAT) {
            FCLog.get().debug("fire " + System.currentTimeMillis());
        }
        if (this.isCurrentPacketEmpty()) {
            return;
        }
        this.currentPacketBytePointer.setShort((short)3);
        long curSeq = this.currentSequence++;
        this.currentAvail -= 2;
        if (this.currentAvail < 0) {
            throw new RuntimeException("negative bytes left " + this.currentAvail);
        }
        this.getPacketAt(curSeq).setBytesLeft(this.currentAvail);
        long newSeq = curSeq + 1L;
        DataPacket newPack = this.getPacketAt(newSeq);
        newPack.dataPointer(this.currentPacketBytePointer);
        newPack.setSeqNo(newSeq);
        this.currentAvail = this.payMaxLen - 4;
        newPack.getReceiver().setString(this.currentReceiver);
        newPack.setRetrans(false);
    }

    private boolean isCurrentPacketEmpty() {
        return this.currentAvail == this.payMaxLen - 4;
    }

    private boolean sendPendingPackets() throws IOException {
        if (this.currentSequence <= this.nextSendMsg) {
            return false;
        }
        this.sendPackets(this.nextSendMsg, this.currentSequence, false, 0L);
        return true;
    }

    private boolean sendPendingRetrans() throws IOException {
        if (this.retransRequests.peek() != null) {
            ArrayList<RetransPacket> curRetrans = new ArrayList<RetransPacket>();
            RetransPacket poll = null;
            do {
                poll = this.retransRequests.poll();
                curRetrans.add(poll);
            } while (poll != null);
            this.mergeRetransmissions(curRetrans);
            return true;
        }
        return false;
    }

    private void putRetransSent(long sequence, long tim) {
        int index = (int)(sequence % (long)RETRANS_MEM);
        this.sentRetransSeq[index] = sequence;
        this.sentRetransTimes[index] = tim;
    }

    private long getLastRetransmitted(long sequence) {
        int index = (int)(sequence % (long)RETRANS_MEM);
        if (this.sentRetransSeq[index] == sequence) {
            return this.sentRetransTimes[index];
        }
        return 0L;
    }

    private void mergeRetransmissions(ArrayList<RetransPacket> curRetrans) throws IOException {
        long now = System.currentTimeMillis();
        long maxTo = 0L;
        for (int i = 0; i < curRetrans.size(); ++i) {
            RetransPacket retransPacket = curRetrans.get(i);
            if (retransPacket == null || retransPacket.retransEntriesLen() <= 0) continue;
            maxTo = this.sendRetransmissionResponse(maxTo, retransPacket, now);
        }
    }

    private long sendRetransmissionResponse(long maxTo, RetransPacket retransPacket, long now) throws IOException {
        for (int ii = 0; ii < retransPacket.getRetransIndex(); ++ii) {
            RetransEntry en = retransPacket.retransEntries(ii);
            if (en == null) {
                System.out.println("unexpected empty retrans entry");
                continue;
            }
            long fromSeqNo = this.getPacketAt(en.getFrom()).getSeqNo();
            if (fromSeqNo != en.getFrom()) {
                fromSeqNo = en.getFrom();
                if (this.currentSequence - fromSeqNo > (long)this.maxRetransAge) {
                    this.maxRetransAge = (int)(this.currentSequence - fromSeqNo);
                    FCLog.get().warn("old retransmission from " + retransPacket.getSender() + " age " + this.maxRetransAge + " requested:" + fromSeqNo + " curseq " + this.currentSequence + " topic " + this.topicEntry.getTopicId());
                }
                this.dropMsg.setReceiver(retransPacket.getSender());
                this.dropMsg.setSeqNo(en.getFrom());
                FCLog.get().warn("Sending Drop " + (Object)((Object)this.dropMsg) + " requestedSeq " + fromSeqNo + " on service " + this.getTopicEntry().getTopicId() + " currentSeq " + this.currentSequence + " age: " + (this.currentSequence - en.getFrom()));
                this.batchController.countPacket();
                this.trans.send(new DatagramPacket(this.dropMsg.getBase().toBytes((long)((int)this.dropMsg.getOffset()), this.dropMsg.getByteSize()), 0, this.dropMsg.getByteSize()));
                continue;
            }
            long from = en.getFrom();
            long to = en.getTo();
            if (from < maxTo && to <= maxTo) continue;
            from = Math.max(from, maxTo);
            maxTo = Math.max(to, maxTo);
            this.sendPackets(from, to, true, now);
        }
        return maxTo;
    }

    private void sendPackets(long sendStart, long sendEnd, boolean retrans, long now) throws IOException {
        long len = sendEnd - sendStart;
        for (long i = sendStart; i < sendEnd; ++i) {
            DataPacket dataPacket = this.getPacketAt(i);
            if (retrans) {
                this.putRetransSent(i, now);
            } else if (dataPacket.getSeqNo() != i) {
                FCLog.get().fatal("FATAL error, current seq:" + this.currentSequence + " expected Seq:[" + i + "] real read:" + dataPacket.getSeqNo());
                FCLog.get().fatal("current put seq " + this.currentSequence);
                FCLog.get().fatal("current send seq " + this.nextSendMsg);
                FCLog.get().fatal("current pointer and currentpackpointer " + dataPacket.___offset + " " + this.currentPacketBytePointer.___offset);
                FCLog.get().fatal(null, new Exception("stack trace"));
                for (int ii = 0; ii < 20; ++ii) {
                    FCLog.get().fatal("  =>" + this.getPacketAt(i + (long)ii).getSeqNo());
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    FCLog.log(e);
                }
                System.exit(2);
            }
            dataPacket.setRetrans(retrans);
            this.moveBuff(dataPacket);
            this.batchController.countPacket();
            this.trans.send(this.tmpSend);
            while (this.batchController.getAction() == BatchingController.Action.BLOCK) {
            }
        }
        if (!retrans) {
            this.nextSendMsg = sendEnd;
        }
    }

    void addRetransmissionRequest(RetransPacket retransPacket, PhysicalTransport trans) throws IOException {
        RetransPacket copy = (RetransPacket)retransPacket.createCopy();
        if (RETRANSDEBUG) {
            FCLog.get().info("received retrans request and add to Q " + (Object)((Object)copy));
        }
        this.retransRequests.add(copy);
        this.flush();
    }

    private void lock() {
        while (!this.sendLock.compareAndSet(false, true)) {
        }
    }

    private void unlock() {
        this.sendLock.set(false);
    }

    protected boolean offerNoLock(String receiverNodeId, ByteSource msg, long start, int len, boolean doFlush) {
        long now = System.nanoTime();
        if (receiverNodeId != KEEP_SUBS_NODEID) {
            this.setReceiver(receiverNodeId);
        }
        if (msg != null) {
            BatchingController.Action action = this.batchController.getAction();
            if (action == BatchingController.Action.BLOCK) {
                return false;
            }
            if (action == BatchingController.Action.BATCH) {
                if (!this.batchOnLimit) {
                    return false;
                }
                doFlush = false;
            } else if (action != BatchingController.Action.NONE) {
                throw new RuntimeException("unexpected batchcontroller state");
            }
        }
        boolean res = msg == null ? true : this.putMessage(-1, msg, start, len, true);
        if (now - this.lastMsgFlush > this.hbInvtervalMS * 1000L * 1000L) {
            long prevFlush = this.lastMsgFlush;
            this.lastMsgFlush = now;
            if (!this.offerNoLock(null, (ByteSource)this.heartbeat, 0L, 1, false)) {
                this.lastMsgFlush = prevFlush;
            }
            return res;
        }
        if (doFlush && !this.isCurrentPacketEmpty()) {
            this.lastMsgFlush = now;
            this.fire();
        }
        try {
            if (!this.sendPendingRetrans() && this.sendPendingPackets()) {
                this.lastMsgFlush = now;
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return res;
    }

    @Override
    public boolean offer(String receiverNodeId, byte[] b, int start, int len, boolean doFlush) {
        this.hp.setBase(b, (long)start, (long)len);
        return this.offer(receiverNodeId, (ByteSource)this.hp, doFlush);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean offer(String receiverNodeId, ByteSource msg, long start, int len, boolean doFlush) {
        try {
            this.lock();
            boolean bl = this.offerNoLock(receiverNodeId, msg, start, len, doFlush);
            return bl;
        }
        finally {
            this.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean offer(String subscriberNodeId, ByteSource msg, boolean doFlush) {
        try {
            this.lock();
            boolean bl = this.offerNoLock(subscriberNodeId, msg, 0L, (int)msg.length(), doFlush);
            return bl;
        }
        finally {
            this.unlock();
        }
    }

    @Override
    public int getTopicId() {
        return this.topicEntry.getTopicId();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void setReceiver(String receiverNodeId) {
        if (receiverNodeId == null) {
            if (this.currentReceiver == null) return;
            this.currentReceiver = null;
            this.updateCurrentReceiver();
            return;
        } else {
            if (receiverNodeId.equals(this.currentReceiver)) {
                return;
            }
            this.currentReceiver = receiverNodeId;
            this.updateCurrentReceiver();
        }
    }

    private void updateCurrentReceiver() {
        if (this.isCurrentPacketEmpty()) {
            this.getPacketAt(this.currentSequence).getReceiver().setString(this.currentReceiver);
        } else {
            this.fire();
            this.updateCurrentReceiver();
        }
    }

    @Override
    public void setPacketRateLimit(int limit) {
        this.batchController = new BatchingController(limit);
    }

    @Override
    public int getPacketRateLimit() {
        return this.batchController.getRatePerSecond();
    }

    @Override
    public FCPublisher batchOnLimit(boolean doBatch) {
        this.batchOnLimit = doBatch;
        return this;
    }

    @Override
    public boolean isBatchOnLimit(boolean doBatch) {
        return doBatch;
    }

    @Override
    public void flush() {
        this.offer(KEEP_SUBS_NODEID, (ByteSource)null, 0L, 0, true);
    }
}

