/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.handlers;

import java.util.TimerTask;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.VarArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageConsumeCallback
implements Callback<Void> {
    private static Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class);
    private final HChannelManager channelManager;
    private final long consumeRetryWaitTime;

    public MessageConsumeCallback(ClientConfiguration cfg, HChannelManager channelManager) {
        this.channelManager = channelManager;
        this.consumeRetryWaitTime = cfg.getMessageConsumeRetryWaitTime();
    }

    @Override
    public void operationFinished(Object ctx, Void resultOfOperation) {
        MessageConsumeData messageConsumeData = (MessageConsumeData)ctx;
        SubscribeResponseHandler subscribeHChannelHandler = this.channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
        if (null == subscribeHChannelHandler || !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
            logger.warn("No subscription {} found to consume message {}.", VarArgs.va(messageConsumeData.topicSubscriber, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)messageConsumeData.msg.getMsgId())));
            return;
        }
        subscribeHChannelHandler.messageConsumed(messageConsumeData.topicSubscriber, messageConsumeData.msg);
    }

    @Override
    public void operationFailed(Object ctx, PubSubException exception) {
        MessageConsumeData messageConsumeData = (MessageConsumeData)ctx;
        logger.error("Message was not consumed successfully by client MessageHandler: {}", (Object)messageConsumeData);
        this.channelManager.schedule(new MessageConsumeRetryTask(messageConsumeData), this.consumeRetryWaitTime);
    }

    class MessageConsumeRetryTask
    extends TimerTask {
        private final MessageConsumeData messageConsumeData;

        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) {
            this.messageConsumeData = messageConsumeData;
        }

        @Override
        public void run() {
            SubscribeResponseHandler subscribeHChannelHandler = MessageConsumeCallback.this.channelManager.getSubscribeResponseHandler(this.messageConsumeData.topicSubscriber);
            if (null == subscribeHChannelHandler || !subscribeHChannelHandler.hasSubscription(this.messageConsumeData.topicSubscriber)) {
                logger.warn("No subscription {} found to retry delivering message {}.", VarArgs.va(this.messageConsumeData.topicSubscriber, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)this.messageConsumeData.msg.getMsgId())));
                return;
            }
            subscribeHChannelHandler.asyncMessageDeliver(this.messageConsumeData.topicSubscriber, this.messageConsumeData.msg);
        }
    }
}

