package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.impl.TransportEvent;

/* loaded from: input_file:org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.class */
abstract class AbstractNettyEventListener implements NettyEventListener {
    protected static final Logger LOG = Logger.getLogger(AbstractNettyEventListener.class.getName());
    protected final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap;
    protected final EStage<TransportEvent> stage;
    protected EventHandler<Exception> exceptionHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractNettyEventListener(ConcurrentMap<SocketAddress, LinkReference> concurrentMap, EStage<TransportEvent> eStage) {
        this.addrToLinkRefMap = concurrentMap;
        this.stage = eStage;
    }

    public void registerErrorHandler(EventHandler<Exception> eventHandler) {
        LOG.log(Level.FINE, "Set error handler {0}", eventHandler);
        this.exceptionHandler = eventHandler;
    }

    @Override // org.apache.reef.wake.remote.transport.netty.NettyEventListener
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        Channel channel = channelHandlerContext.channel();
        byte[] bArr = (byte[]) obj;
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.log(Level.FINEST, "MessageEvent: local: {0} remote: {1} :: {2}", new Object[]{channel.localAddress(), channel.remoteAddress(), bArr});
        }
        if (bArr.length > 0) {
            this.stage.onNext(getTransportEvent(bArr, channel));
        }
    }

    @Override // org.apache.reef.wake.remote.transport.netty.NettyEventListener
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        Channel channel = channelHandlerContext.channel();
        LOG.log(Level.WARNING, "ExceptionEvent: local: {0} remote: {1} :: {2}", new Object[]{channel.localAddress(), channel.remoteAddress(), th});
        exceptionCleanup(channelHandlerContext, th);
        if (this.exceptionHandler != null) {
            this.exceptionHandler.onNext(th instanceof Exception ? (Exception) th : new Exception(th));
        }
    }

    @Override // org.apache.reef.wake.remote.transport.netty.NettyEventListener
    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        closeChannel(channelHandlerContext.channel());
    }

    protected abstract TransportEvent getTransportEvent(byte[] bArr, Channel channel);

    protected abstract void exceptionCleanup(ChannelHandlerContext channelHandlerContext, Throwable th);

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        LinkReference remove = (channel == null || channel.remoteAddress() == null) ? null : this.addrToLinkRefMap.remove(channel.remoteAddress());
        Logger logger = LOG;
        Level level = Level.FINER;
        Object[] objArr = new Object[2];
        objArr[0] = channel;
        objArr[1] = Boolean.valueOf(remove != null);
        logger.log(level, "Channel closed: {0}. Link ref found and removed: {1}", objArr);
    }
}
