package org.apache.carbondata.core.dictionary.client;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

/* loaded from: input_file:org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.class */
public class DictionaryClientHandler extends SimpleChannelHandler {
    private static final LogService LOGGER = LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
    private ChannelHandlerContext ctx;
    final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap();
    private Object lock = new Object();

    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.ctx = channelHandlerContext;
        LOGGER.audit("Connected " + channelHandlerContext.getHandler());
        super.channelConnected(channelHandlerContext, channelStateEvent);
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        DictionaryKey deserialize = KryoRegister.deserialize((byte[]) messageEvent.getMessage());
        this.dictKeyQueueMap.get(deserialize.getThreadNo()).offer(deserialize);
        super.messageReceived(channelHandlerContext, messageEvent);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        LOGGER.error("exceptionCaught");
        exceptionEvent.getCause().printStackTrace();
        channelHandlerContext.getChannel().close();
    }

    public DictionaryKey getDictionary(DictionaryKey dictionaryKey) {
        DictionaryKey take;
        BlockingQueue<DictionaryKey> blockingQueue = null;
        try {
            synchronized (this.lock) {
                blockingQueue = this.dictKeyQueueMap.get(dictionaryKey.getThreadNo());
                if (blockingQueue == null) {
                    blockingQueue = new LinkedBlockingQueue();
                    this.dictKeyQueueMap.put(dictionaryKey.getThreadNo(), blockingQueue);
                }
            }
            this.ctx.getChannel().write(KryoRegister.serialize(dictionaryKey));
        } catch (Exception e) {
            LOGGER.error("Error while send request to server " + e.getMessage());
            this.ctx.getChannel().close();
        }
        boolean z = false;
        while (true) {
            try {
                take = blockingQueue.take();
                break;
            } catch (InterruptedException e2) {
                z = true;
            } catch (Throwable th) {
                if (z) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
        return take;
    }
}
