package net.hycube.transport;

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import net.hycube.configuration.GlobalConstants;
import net.hycube.core.InitializationException;
import net.hycube.core.NodeParameterSet;
import net.hycube.environment.Environment;
import net.hycube.environment.NodeProperties;
import net.hycube.eventprocessing.Event;
import net.hycube.eventprocessing.EventCategory;
import net.hycube.eventprocessing.ProcessEventProxy;
import net.hycube.hidden.org.apache.commons.logging.Log;
import net.hycube.logging.LogHelper;
import net.hycube.messaging.messages.Message;
import net.hycube.messaging.messages.MessageByteConversionException;
import net.hycube.messaging.messages.MessageFactory;
import net.hycube.utils.ClassInstanceLoadException;
import net.hycube.utils.ClassInstanceLoader;

/* loaded from: input_file:hycube-1.0.2-shaded.jar:net/hycube/transport/UDPSelectorMessageReceiver.class */
public class UDPSelectorMessageReceiver implements MessageReceiver {
    private static Log userLog = LogHelper.getUserLog();
    private static Log devLog = LogHelper.getDevLog(UDPMessageReceiver.class);
    private static Log msgLog = LogHelper.getMessagesLog();
    public static final int RECEIVE_BUFFER_SIZE = 65535;
    public static final int SELECTOR_SELECT_TIMEOUT = 1000;
    protected NodeProperties properties;
    protected Selector selector;
    protected HashMap<String, NetworkAdapter> networkAdapters;
    protected List<DatagramChannel> channels;
    protected List<String> addresses;
    protected HashMap<String, SelectionKey> selectionKeys;
    protected BlockingQueue<Event> receiveEventQueue;
    protected Environment environment;
    protected MessageFactory messageFactory;
    protected boolean wakeable;
    protected MessageReceiverProcessEventProxy messageReceiverProcessEventProxy;
    protected int currentSocketIndex = 0;
    protected Object selectLock = new Object();
    protected boolean initialized = false;
    byte[] byteArray = new byte[65535];
    protected ByteBuffer buff = ByteBuffer.wrap(this.byteArray);
    protected boolean hold = false;
    protected boolean wasHeld = false;
    protected int wasHeldNum = 0;
    protected Object holdLock = new Object();
    protected Object wakeableLock = new Object();

    @Override // net.hycube.transport.MessageReceiver
    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // net.hycube.transport.MessageReceiver
    public synchronized void initialize(Environment environment, BlockingQueue<Event> blockingQueue, NodeProperties nodeProperties) throws MessageReceiverException, InitializationException {
        if (devLog.isInfoEnabled()) {
            devLog.info("Initializing message receiver.");
        }
        if (blockingQueue == null) {
            throw new IllegalArgumentException("receiveEventQueue is null.");
        }
        if (environment == null) {
            throw new IllegalArgumentException("environment is null.");
        }
        try {
            this.selector = Selector.open();
            this.properties = nodeProperties;
            this.networkAdapters = new HashMap<>();
            this.addresses = new ArrayList();
            this.channels = new ArrayList();
            this.selectionKeys = new HashMap<>();
            this.receiveEventQueue = blockingQueue;
            this.environment = environment;
            try {
                String property = nodeProperties.getProperty(NodeParameterSet.PROP_KEY_MESSAGE_FACTORY);
                if (property == null || property.trim().isEmpty()) {
                    throw new InitializationException(InitializationException.Error.INVALID_PARAMETER_VALUE, nodeProperties.getAbsoluteKey(NodeParameterSet.PROP_KEY_MESSAGE_FACTORY), "Invalid parameter value: " + nodeProperties.getAbsoluteKey(NodeParameterSet.PROP_KEY_MESSAGE_FACTORY));
                }
                NodeProperties nestedProperty = nodeProperties.getNestedProperty(NodeParameterSet.PROP_KEY_MESSAGE_FACTORY, property);
                this.messageFactory = (MessageFactory) ClassInstanceLoader.newInstance(nestedProperty.getProperty(GlobalConstants.PROP_KEY_CLASS), (Class<?>) MessageFactory.class);
                this.messageFactory.initialize(nestedProperty);
                this.hold = false;
                this.wasHeld = false;
                this.messageReceiverProcessEventProxy = new MessageReceiverProcessEventProxy(this);
                this.initialized = true;
                if (userLog.isInfoEnabled()) {
                    userLog.info("Initialized message receiver.");
                }
                if (devLog.isInfoEnabled()) {
                    devLog.info("Initialized message receiver.");
                }
            } catch (ClassInstanceLoadException e) {
                throw new InitializationException(InitializationException.Error.CLASS_INSTANTIATION_ERROR, e.getLoadedClassName(), "Unable to create message factory instance.", e);
            }
        } catch (IOException e2) {
            throw new MessageReceiverException("An exception thrown while opening the Selector.", e2);
        }
    }

    @Override // net.hycube.transport.MessageReceiver
    public synchronized void registerNetworkAdapter(NetworkAdapter networkAdapter) throws MessageReceiverException {
        if (!(networkAdapter instanceof UDPSelectorNetworkAdapter)) {
            throw new IllegalArgumentException("The network adapter should be an instance of UDPSelectorNetworkAdapter class.");
        }
        registerNetworkAdapter((UDPSelectorNetworkAdapter) networkAdapter);
    }

    public synchronized void registerNetworkAdapter(UDPSelectorNetworkAdapter uDPSelectorNetworkAdapter) throws MessageReceiverException {
        if (devLog.isInfoEnabled()) {
            devLog.info("Registering new network adapter.");
        }
        if (!this.initialized) {
            throw new MessageReceiverRuntimeException("The message receiver is not initialized.");
        }
        hold();
        wakeup();
        synchronized (this.selectLock) {
            unhold();
            if (this.networkAdapters.containsKey(uDPSelectorNetworkAdapter.getPublicAddressString())) {
                throw new MessageReceiverRuntimeException("The message receiver already registered a network adapter with the same network address.");
            }
            this.networkAdapters.put(uDPSelectorNetworkAdapter.getPublicAddressString(), uDPSelectorNetworkAdapter);
            DatagramChannel channel = uDPSelectorNetworkAdapter.getChannel();
            this.channels.add(channel);
            this.addresses.add(uDPSelectorNetworkAdapter.getPublicAddressString());
            try {
                this.selectionKeys.put(uDPSelectorNetworkAdapter.getPublicAddressString(), channel.register(this.selector, 1));
            } catch (ClosedChannelException e) {
                throw new MessageReceiverException("An exception thrown while registering the channel with the selector.", e);
            }
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Registered new network adapter. Network address: " + uDPSelectorNetworkAdapter.getPublicAddressString());
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Registered new network adapter. Network address: " + uDPSelectorNetworkAdapter.getPublicAddressString());
        }
    }

    @Override // net.hycube.transport.MessageReceiver
    public synchronized void unregisterNetworkAdapter(NetworkAdapter networkAdapter) {
        if (!(networkAdapter instanceof UDPSelectorNetworkAdapter)) {
            throw new IllegalArgumentException("The network adapter should be an instance of UDPSelectorNetworkAdapter class.");
        }
        unregisterNetworkAdapter((UDPSelectorNetworkAdapter) networkAdapter);
    }

    public synchronized void unregisterNetworkAdapter(UDPSelectorNetworkAdapter uDPSelectorNetworkAdapter) {
        if (devLog.isInfoEnabled()) {
            devLog.info("Unregistering new network adapter.");
        }
        if (!this.initialized) {
            throw new MessageReceiverRuntimeException("The message receiver is not initialized.");
        }
        hold();
        wakeup();
        synchronized (this.selectLock) {
            unhold();
            if (this.networkAdapters.containsKey(uDPSelectorNetworkAdapter.getPublicAddressString())) {
                this.networkAdapters.remove(uDPSelectorNetworkAdapter.getPublicAddressString());
                this.channels.remove(uDPSelectorNetworkAdapter.getChannel());
                this.addresses.remove(uDPSelectorNetworkAdapter.getPublicAddressString());
                this.selectionKeys.get(uDPSelectorNetworkAdapter.getPublicAddressString()).cancel();
                if (userLog.isInfoEnabled()) {
                    userLog.info("Unregistered new network adapter. Network address: " + uDPSelectorNetworkAdapter.getPublicAddressString());
                }
                if (devLog.isInfoEnabled()) {
                    devLog.info("Unregistered new network adapter. Network address: " + uDPSelectorNetworkAdapter.getPublicAddressString());
                }
            }
        }
    }

    @Override // net.hycube.transport.MessageReceiver
    public void receiveMessage() throws MessageReceiverException {
        if (devLog.isTraceEnabled()) {
            devLog.trace("receiveMessage() called.");
        }
        if (!this.initialized) {
            throw new MessageReceiverRuntimeException("The message receiver is not initialized.");
        }
        synchronized (this.selectLock) {
            if (isInitialized()) {
                if (checkHoldAndSetHeld()) {
                    return;
                }
                if (devLog.isDebugEnabled()) {
                    devLog.debug("Checking for message - calling select().");
                }
                try {
                    try {
                        this.selector.select(1000L);
                        synchronized (this.wakeableLock) {
                            this.wakeable = false;
                        }
                        do {
                            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                            while (it.hasNext()) {
                                Message message = null;
                                String str = null;
                                String str2 = null;
                                SelectionKey next = it.next();
                                it.remove();
                                if (next.isValid()) {
                                    if (next.isReadable()) {
                                        if (devLog.isDebugEnabled()) {
                                            devLog.debug("Receiving packet from the socket...");
                                        }
                                        DatagramChannel datagramChannel = (DatagramChannel) next.channel();
                                        DatagramSocket socket = datagramChannel.socket();
                                        InetAddress localAddress = socket.getLocalAddress();
                                        int localPort = socket.getLocalPort();
                                        if (localAddress != null && localPort != -1) {
                                            str = localAddress.getHostAddress() + ":" + localPort;
                                            this.buff.rewind();
                                            try {
                                                SocketAddress receive = datagramChannel.receive(this.buff);
                                                if (receive != null) {
                                                    InetSocketAddress inetSocketAddress = (InetSocketAddress) receive;
                                                    str2 = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
                                                    if (devLog.isDebugEnabled()) {
                                                        devLog.debug("Packet was received from the socket...");
                                                    }
                                                    if (devLog.isDebugEnabled()) {
                                                        devLog.debug("Converting received packet to a message object...");
                                                    }
                                                    try {
                                                        message = this.messageFactory.fromBytes(this.buff.array());
                                                    } catch (MessageByteConversionException e) {
                                                        if (msgLog.isDebugEnabled()) {
                                                            msgLog.debug("Invalid message - could not convert to the Message object. Message discarded.", e);
                                                        }
                                                        if (devLog.isDebugEnabled()) {
                                                            devLog.debug("Invalid message - could not convert to the Message object. Message discarded.", e);
                                                        }
                                                    }
                                                }
                                            } catch (ClosedByInterruptException e2) {
                                            } catch (IOException e3) {
                                                throw new MessageReceiverException("An exception thrown during channel.receive() call.", e3);
                                            }
                                        }
                                    }
                                    if (message != null) {
                                        NetworkAdapter networkAdapter = this.networkAdapters.get(str);
                                        if (networkAdapter == null) {
                                            devLog.debug("Message receiver received a message for the network address for which the networkAdapter was not registered.");
                                        } else {
                                            if (devLog.isDebugEnabled()) {
                                                devLog.debug("Received message: " + message.getSerialNoAndSenderString());
                                            }
                                            if (msgLog.isDebugEnabled()) {
                                                msgLog.debug("Received message: " + message.getSerialNoAndSenderString());
                                            }
                                            if (devLog.isDebugEnabled()) {
                                                devLog.debug("Passing the received message to the network adapter.");
                                            }
                                            this.networkAdapters.get(str).messageReceived(message, networkAdapter.createNetworkNodePointer(str2));
                                        }
                                    }
                                }
                            }
                            try {
                            } catch (IOException e4) {
                                throw new MessageReceiverException("An exception thrown during the selectNow() call.", e4);
                            }
                        } while (this.selector.selectNow() > 0);
                        enqueueMessageReceiverEvent();
                    } catch (Throwable th) {
                        synchronized (this.wakeableLock) {
                            this.wakeable = false;
                            throw th;
                        }
                    }
                } catch (IOException e5) {
                    throw new MessageReceiverException("An exception thrown during the select() call.", e5);
                }
            }
        }
    }

    @Override // net.hycube.transport.MessageReceiver
    public void startMessageReceiver() {
        enqueueMessageReceiverEvent();
    }

    @Override // net.hycube.transport.MessageReceiver
    public void startMessageReceiver(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("Illegal number of events to be enqueued.");
        }
        for (int i2 = 0; i2 < i; i2++) {
            enqueueMessageReceiverEvent();
        }
    }

    protected void enqueueMessageReceiverEvent() {
        synchronized (this.wakeableLock) {
            this.wakeable = true;
        }
        boolean z = false;
        while (!z) {
            try {
                this.receiveEventQueue.put(new Event(this.environment.getTimeProvider().getCurrentTime(), EventCategory.receiveMessageEvent, (ProcessEventProxy) this.messageReceiverProcessEventProxy, (Object[]) null));
                z = true;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // net.hycube.transport.MessageReceiver
    public synchronized void discard() throws MessageReceiverException {
        if (devLog.isInfoEnabled()) {
            devLog.info("Discarding the message receiver.");
        }
        hold();
        wakeup();
        synchronized (this.selectLock) {
            this.initialized = false;
            this.networkAdapters = null;
            this.channels = null;
            this.addresses = null;
            this.selectionKeys = null;
            this.receiveEventQueue = null;
            this.currentSocketIndex = 0;
            this.environment = null;
            try {
                this.selector.close();
                this.selector = null;
                this.hold = false;
                this.wasHeld = false;
                this.wakeable = false;
                this.messageReceiverProcessEventProxy = null;
                this.properties = null;
            } catch (IOException e) {
                throw new MessageReceiverException("An exception thrown while closing the selector.", e);
            }
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Discarded the message receiver.");
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Discarded the message receiver.");
        }
    }

    protected void wakeup() {
        synchronized (this.wakeableLock) {
            if (this.wakeable) {
                this.selector.wakeup();
            }
        }
    }

    protected void hold() {
        synchronized (this.holdLock) {
            this.hold = true;
        }
    }

    protected void unhold() {
        synchronized (this.holdLock) {
            this.hold = false;
            if (this.wasHeld) {
                this.wasHeld = false;
                while (this.wasHeldNum > 0) {
                    enqueueMessageReceiverEvent();
                    this.wasHeldNum--;
                }
            }
        }
    }

    protected boolean checkHoldAndSetHeld() {
        synchronized (this.holdLock) {
            if (!this.hold) {
                return false;
            }
            this.wasHeld = true;
            this.wasHeldNum++;
            synchronized (this.wakeableLock) {
                this.wakeable = false;
            }
            return true;
        }
    }
}
