/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.offset;

import com.google.common.truth.Truth;
import java.util.Collection;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class OffsetNotFoundIT
extends BaseConf {
    private OffsetRpcHook offsetRpcHook = new OffsetRpcHook();

    @Before
    public void setUp() {
        for (BrokerController brokerController : brokerControllerList) {
            brokerController.registerServerRPCHook((RPCHook)this.offsetRpcHook);
        }
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumeStopAndResume() {
        RMQNormalConsumer consumer;
        String topic = OffsetNotFoundIT.initTopic();
        RMQNormalProducer producer = OffsetNotFoundIT.getProducer(NAMESRV_ADDR, topic);
        int msgSize = 10;
        producer.send((long)msgSize);
        Assert.assertEquals((String)"Not all sent succeeded", (long)msgSize, (long)producer.getAllUndupMsgBody().size());
        try {
            this.offsetRpcHook.throwException = true;
            consumer = OffsetNotFoundIT.getConsumer(NAMESRV_ADDR, topic, "*", (AbstractListener)new RMQNormalListener());
            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
            Assert.assertEquals((long)0L, (long)consumer.getListener().getAllMsgBody().size());
            consumer.shutdown();
        }
        finally {
            this.offsetRpcHook.throwException = false;
        }
        consumer = OffsetNotFoundIT.getConsumer(NAMESRV_ADDR, topic, "*", (AbstractListener)new RMQNormalListener());
        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
        Assert.assertEquals((long)producer.getAllMsgBody().size(), (long)consumer.getListener().getAllMsgBody().size());
        Truth.assertThat((Iterable)VerifyUtils.getFilterdMessage((Collection)producer.getAllMsgBody(), (Collection)consumer.getListener().getAllMsgBody())).containsExactlyElementsIn((Iterable)producer.getAllMsgBody());
        consumer.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOffsetNotFoundException() {
        String topic = OffsetNotFoundIT.initTopic();
        String group = OffsetNotFoundIT.initConsumerGroup();
        RMQNormalProducer producer = OffsetNotFoundIT.getProducer(NAMESRV_ADDR, topic);
        int msgSize = 10;
        producer.send((long)msgSize);
        Assert.assertEquals((String)"Not all sent succeeded", (long)msgSize, (long)producer.getAllUndupMsgBody().size());
        try {
            this.offsetRpcHook.addSetZeroOfNotFound = true;
            RMQNormalConsumer consumer = new RMQNormalConsumer(NAMESRV_ADDR, topic, "*", group, (AbstractListener)new RMQNormalListener());
            consumer.create(false);
            consumer.getConsumer().setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.start();
            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
            Assert.assertEquals((long)producer.getAllMsgBody().size(), (long)consumer.getListener().getAllMsgBody().size());
            Truth.assertThat((Iterable)VerifyUtils.getFilterdMessage((Collection)producer.getAllMsgBody(), (Collection)consumer.getListener().getAllMsgBody())).containsExactlyElementsIn((Iterable)producer.getAllMsgBody());
            consumer.shutdown();
        }
        finally {
            this.offsetRpcHook.addSetZeroOfNotFound = false;
        }
    }

    static class OffsetRpcHook
    implements RPCHook {
        private boolean throwException = false;
        private boolean addSetZeroOfNotFound = false;

        OffsetRpcHook() {
        }

        public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
            if (request.getCode() == 14) {
                if (this.throwException) {
                    throw new RuntimeException("Stop by rpc hook");
                }
                if (this.addSetZeroOfNotFound) {
                    request.getExtFields().put("setZeroIfNotFound", "false");
                }
            }
        }

        public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
        }
    }
}

