/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.fastcast.impl;

import java.io.IOException;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.fastcast.api.FCPublisher;
import org.nustaq.fastcast.api.FCSubscriber;
import org.nustaq.fastcast.api.FastCast;
import org.nustaq.fastcast.config.PhysicalTransportConf;
import org.nustaq.fastcast.config.PublisherConf;
import org.nustaq.fastcast.config.SubscriberConf;
import org.nustaq.fastcast.impl.ControlPacket;
import org.nustaq.fastcast.impl.DataPacket;
import org.nustaq.fastcast.impl.Packet;
import org.nustaq.fastcast.impl.PacketReceiveBuffer;
import org.nustaq.fastcast.impl.PacketSendBuffer;
import org.nustaq.fastcast.impl.ReceiveBufferDispatcher;
import org.nustaq.fastcast.impl.RetransPacket;
import org.nustaq.fastcast.impl.Topic;
import org.nustaq.fastcast.transport.PhysicalTransport;
import org.nustaq.fastcast.util.FCLog;
import org.nustaq.offheap.structs.FSTStruct;
import org.nustaq.offheap.structs.FSTStructAllocator;
import org.nustaq.offheap.structs.structtypes.StructString;

public class TransportDriver {
    private static final boolean RETRANS_DEBUG = false;
    public static int MAX_NUM_TOPICS = 256;
    int spinIdleLoopMicros = 10000000;
    int idleParkMicros = 500;
    volatile PhysicalTransport trans;
    ReceiveBufferDispatcher[] receiver;
    PacketSendBuffer[] sender;
    long[] lastMsg;
    StructString nodeId;
    Thread receiverThread;
    Thread houseKeeping;
    FSTStructAllocator alloc = new FSTStructAllocator(1);
    long autoFlushMS;
    private ConcurrentHashMap<Integer, Topic> topics = new ConcurrentHashMap();
    int tCheckCounter = 0;
    volatile int terminationCounter = 0;
    long lastTimeoutCheck = System.currentTimeMillis();
    Packet receivedPacket;
    DataPacket tmpP;
    PhysicalTransport emptyTransport = new PhysicalTransport(){

        @Override
        public boolean receive(ByteBuffer pack) throws IOException {
            return false;
        }

        @Override
        public boolean receive(DatagramPacket pack) throws IOException {
            return false;
        }

        @Override
        public void send(DatagramPacket pack) throws IOException {
        }

        @Override
        public void send(byte[] bytes, int off, int len) throws IOException {
        }

        @Override
        public void send(ByteBuffer b) throws IOException {
        }

        @Override
        public void join() throws IOException {
        }

        @Override
        public PhysicalTransportConf getConf() {
            return null;
        }

        @Override
        public void close() {
        }

        @Override
        public boolean isBlocking() {
            return false;
        }
    };

    public TransportDriver(PhysicalTransport trans, String nodeId) {
        this.trans = trans;
        this.nodeId = (StructString)this.alloc.newStruct((FSTStruct)new StructString(nodeId));
        PhysicalTransportConf tconf = trans.getConf();
        this.autoFlushMS = tconf.getAutoFlushMS();
        this.spinIdleLoopMicros = tconf.getSpinLoopMicros();
        this.idleParkMicros = tconf.getIdleParkMicros();
        this.receiver = new ReceiveBufferDispatcher[MAX_NUM_TOPICS];
        this.sender = new PacketSendBuffer[MAX_NUM_TOPICS];
        this.lastMsg = new long[MAX_NUM_TOPICS];
        this.receiverThread = new Thread("trans receiver " + tconf.getName()){

            @Override
            public void run() {
                TransportDriver.this.receiveLoop();
            }
        };
        this.receiverThread.start();
        this.houseKeeping = new Thread("trans houseKeeping " + tconf.getName()){

            @Override
            public void run() {
                TransportDriver.this.houseKeepingLoop();
            }
        };
        this.houseKeeping.start();
    }

    private void installReceiver(Topic chan, FCSubscriber msgListener) {
        ReceiveBufferDispatcher receiveBufferDispatcher = new ReceiveBufferDispatcher(this.trans.getConf().getDgramsize(), this.nodeId.toString(), chan, msgListener);
        if (this.receiver[chan.getTopicId()] != null) {
            throw new RuntimeException("double usage of topic " + chan.getTopicId() + " on transport " + this.trans.getConf().getName());
        }
        this.receiver[chan.getTopicId()] = receiveBufferDispatcher;
    }

    public boolean hasReceiver(int topicId) {
        return this.receiver[topicId] != null;
    }

    public boolean hasSender(int topicId) {
        return this.sender[topicId] != null;
    }

    private PacketSendBuffer installSender(Topic topicEntry) {
        PacketSendBuffer packetSendBuffer;
        if (this.sender[topicEntry.getTopicId()] != null) {
            return this.sender[topicEntry.getTopicId()];
        }
        this.sender[topicEntry.getTopicId()] = packetSendBuffer = new PacketSendBuffer(this.trans, this.nodeId.toString(), topicEntry);
        topicEntry.setSender(packetSendBuffer);
        return packetSendBuffer;
    }

    private void houseKeepingLoop() {
        ArrayList<String> lostSenders = new ArrayList<String>();
        while (!this.isTerminated()) {
            try {
                int i;
                long now = System.currentTimeMillis();
                if (now - this.lastTimeoutCheck > 2000L) {
                    this.lastTimeoutCheck = now;
                    for (i = 0; i < this.receiver.length; ++i) {
                        ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
                        if (receiveBufferDispatcher == null) continue;
                        Topic topicEntry = receiveBufferDispatcher.getTopicEntry();
                        lostSenders.clear();
                        List<String> timedOutSenders = topicEntry.getTimedOutSenders(lostSenders, now, topicEntry.getHbTimeoutMS());
                        if (timedOutSenders == null || timedOutSenders.size() <= 0 || this.isTerminated()) continue;
                        this.cleanup(timedOutSenders, i);
                    }
                }
                for (i = 0; i < this.sender.length; ++i) {
                    PacketSendBuffer packetSendBuffer = this.sender[i];
                    if (packetSendBuffer == null) continue;
                    long lastFlush = packetSendBuffer.lastMsgFlush;
                    if (this.lastMsg[i] == 0L) {
                        this.lastMsg[i] = lastFlush;
                        continue;
                    }
                    if (this.lastMsg[i] == lastFlush) {
                        if (this.isTerminated()) continue;
                        packetSendBuffer.flush();
                        continue;
                    }
                    this.lastMsg[i] = lastFlush;
                }
                try {
                    Thread.sleep(this.autoFlushMS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        ++this.terminationCounter;
    }

    private boolean isTerminated() {
        return this.trans == this.emptyTransport;
    }

    private void receiveLoop() {
        byte[] receiveBuf = new byte[this.trans.getConf().getDgramsize()];
        DatagramPacket p = new DatagramPacket(receiveBuf, receiveBuf.length);
        ByteBuffer buff = ByteBuffer.wrap(p.getData(), p.getOffset(), p.getLength());
        this.receivedPacket = (Packet)this.alloc.newStruct((FSTStruct)new Packet());
        long idleNanos = System.nanoTime();
        this.receivedPacket.baseOn(receiveBuf, 0);
        while (true) {
            boolean idle = true;
            try {
                buff.position(0);
                if (this.receiveDatagram(buff, receiveBuf)) {
                    idleNanos = System.nanoTime();
                    idle = false;
                } else if (System.nanoTime() - idleNanos > (long)(this.spinIdleLoopMicros * 1000)) {
                    if (!this.trans.isBlocking() && this.idleParkMicros > 0) {
                        LockSupport.parkNanos(1000 * this.idleParkMicros);
                    }
                } else {
                    idle = false;
                    idleNanos = (ThreadLocalRandom.current().nextInt() & 1) == 0 ? ++idleNanos : --idleNanos;
                }
                ++this.tCheckCounter;
                if (this.tCheckCounter != 100000 && !idle && this.spinIdleLoopMicros != 0) continue;
                this.tCheckCounter = 0;
                if (!this.isTerminated()) continue;
            }
            catch (Throwable e) {
                FCLog.log(e);
                continue;
            }
            break;
        }
        while (this.terminationCounter < 1) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500L));
        }
        this.alloc.free();
        for (int i = 0; i < this.receiver.length; ++i) {
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
            if (receiveBufferDispatcher == null) continue;
            receiveBufferDispatcher.cleanupTopic();
        }
    }

    public void terminate() {
        PhysicalTransport oldTrans = this.trans;
        this.trans = this.emptyTransport;
        oldTrans.close();
    }

    private boolean receiveDatagram(ByteBuffer p, byte[] wrappedArr) throws IOException {
        if (this.trans.receive(p)) {
            boolean selfSent = this.receivedPacket.getSender().equals((Object)this.nodeId);
            if (!selfSent) {
                int topic = this.receivedPacket.getTopic();
                if (topic > MAX_NUM_TOPICS || topic < 0) {
                    FCLog.get().warn("foreign traffic");
                    return true;
                }
                if (this.receiver[topic] == null && this.sender[topic] == null) {
                    return true;
                }
                Class type = this.receivedPacket.getPointedClass();
                StructString receivedPacketReceiver = this.receivedPacket.getReceiver();
                if (type == DataPacket.class) {
                    if (this.receiver[topic] == null) {
                        return true;
                    }
                    this.dispatchDataPacket(this.receivedPacket, topic);
                } else if (type == RetransPacket.class) {
                    if (this.sender[topic] == null) {
                        return true;
                    }
                    if (receivedPacketReceiver == null || !receivedPacketReceiver.equals((Object)this.nodeId)) {
                        return true;
                    }
                    this.dispatchRetransmissionRequest(this.receivedPacket, topic);
                } else if (type == ControlPacket.class) {
                    ReceiveBufferDispatcher receiveBufferDispatcher;
                    if (this.isForeignReceiver(receivedPacketReceiver)) {
                        return true;
                    }
                    ControlPacket control = (ControlPacket)this.receivedPacket.cast();
                    if (control.getType() == 0 && (receiveBufferDispatcher = this.receiver[topic]) != null) {
                        FCLog.get().warn(this.nodeId + " has been dropped by " + this.receivedPacket.getSender() + " on service " + receiveBufferDispatcher.getTopicEntry().getTopicId());
                        FCSubscriber service = receiveBufferDispatcher.getTopicEntry().getSubscriber();
                        if (service != null) {
                            if (service.dropped()) {
                                FCLog.get().warn("..resyncing..");
                                PacketReceiveBuffer buffer = receiveBufferDispatcher.getBuffer(this.receivedPacket.getSender());
                                if (buffer != null) {
                                    buffer.resync();
                                } else {
                                    FCLog.get().warn("unexpected null buffer");
                                }
                            } else {
                                this.receiver[topic] = null;
                                receiveBufferDispatcher.cleanupTopic();
                            }
                        }
                    }
                }
                return true;
            }
            return false;
        }
        return false;
    }

    private boolean isForeignReceiver(StructString receivedPacketReceiver) {
        return receivedPacketReceiver != null && receivedPacketReceiver.getLen() > 0 && !receivedPacketReceiver.equals((Object)this.nodeId);
    }

    private void dispatchDataPacket(Packet receivedPacket, int topic) throws IOException {
        PacketReceiveBuffer buffer = this.receiver[topic].getBuffer(receivedPacket.getSender());
        this.tmpP = (DataPacket)receivedPacket.cast().detachTo((FSTStruct)this.tmpP);
        RetransPacket retransPacket = buffer.receivePacket(this.tmpP);
        if (retransPacket != null) {
            if (PacketSendBuffer.RETRANSDEBUG) {
                FCLog.get().info("send retrans request " + (Object)((Object)retransPacket) + " " + retransPacket.getClzId());
            }
            this.trans.send(new DatagramPacket(retransPacket.getBase().toBytes(retransPacket.getOffset(), retransPacket.getByteSize()), 0, retransPacket.getByteSize()));
        }
    }

    private void dispatchRetransmissionRequest(Packet receivedPacket, int topic) throws IOException {
        RetransPacket retransPacket = (RetransPacket)receivedPacket.cast().detach();
        this.sender[topic].addRetransmissionRequest(retransPacket, this.trans);
    }

    void cleanup(List<String> timedOutSenders, int topic) {
        for (int i = 0; i < timedOutSenders.size(); ++i) {
            String s = timedOutSenders.get(i);
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[topic];
            FCLog.get().info("stopped receiving heartbeats from " + s);
            if (receiveBufferDispatcher == null) continue;
            receiveBufferDispatcher.cleanup(s);
        }
    }

    public void subscribe(String subsConf, FCSubscriber subscriber) {
        this.subscribe(FastCast.getFastCast().getSubscriberConf(subsConf), subscriber);
    }

    public void subscribe(SubscriberConf subsConf, FCSubscriber subscriber) {
        Topic topicEntry = this.topics.get(subsConf.getTopicId());
        if (topicEntry == null) {
            topicEntry = new Topic(null, null);
        }
        if (topicEntry.getPublisherConf() != null) {
            throw new RuntimeException("already a sender registered at " + subsConf.getTopicId());
        }
        topicEntry.setSubscriberConf(subsConf);
        topicEntry.setChannelDispatcher(this);
        topicEntry.setSubscriber(subscriber);
        this.installReceiver(topicEntry, subscriber);
    }

    public FCPublisher publish(String pubConf) {
        return this.publish(FastCast.getFastCast().getPublisherConf(pubConf));
    }

    public FCPublisher publish(PublisherConf pubConf) {
        Topic topicEntry = this.topics.get(pubConf.getTopicId());
        if (topicEntry == null) {
            topicEntry = new Topic(null, null);
        }
        if (topicEntry.getPublisherConf() != null) {
            throw new RuntimeException("already a sender registered at " + pubConf.getTopicId());
        }
        topicEntry.setChannelDispatcher(this);
        topicEntry.setPublisherConf(pubConf);
        this.topics.put(pubConf.getTopicId(), topicEntry);
        PacketSendBuffer packetSendBuffer = this.installSender(topicEntry);
        return packetSendBuffer;
    }

    public ReceiveBufferDispatcher getReceiver(int topicId) {
        return this.receiver[topicId];
    }
}

