package org.apache.rocketmq.test.offset;

import com.google.common.truth.Truth;
import java.util.Iterator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rocketmq/test/offset/OffsetNotFoundIT.class */
public class OffsetNotFoundIT extends BaseConf {
    private OffsetRpcHook offsetRpcHook = new OffsetRpcHook();

    /* loaded from: input_file:org/apache/rocketmq/test/offset/OffsetNotFoundIT$OffsetRpcHook.class */
    static class OffsetRpcHook implements RPCHook {
        private boolean throwException = false;
        private boolean addSetZeroOfNotFound = false;

        OffsetRpcHook() {
        }

        public void doBeforeRequest(String str, RemotingCommand remotingCommand) {
            if (remotingCommand.getCode() == 14) {
                if (this.throwException) {
                    throw new RuntimeException("Stop by rpc hook");
                }
                if (this.addSetZeroOfNotFound) {
                    remotingCommand.getExtFields().put("setZeroIfNotFound", "false");
                }
            }
        }

        public void doAfterResponse(String str, RemotingCommand remotingCommand, RemotingCommand remotingCommand2) {
        }
    }

    @Before
    public void setUp() {
        Iterator<BrokerController> it = brokerControllerList.iterator();
        while (it.hasNext()) {
            it.next().registerServerRPCHook(this.offsetRpcHook);
        }
    }

    @After
    public void tearDown() {
        BaseConf.shutdown();
    }

    @Test
    public void testConsumeStopAndResume() {
        String initTopic = initTopic();
        RMQNormalProducer producer = getProducer(NAMESRV_ADDR, initTopic);
        producer.send(10);
        Assert.assertEquals("Not all sent succeeded", 10, producer.getAllUndupMsgBody().size());
        try {
            this.offsetRpcHook.throwException = true;
            RMQNormalConsumer consumer = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
            Assert.assertEquals(0L, consumer.getListener().getAllMsgBody().size());
            consumer.shutdown();
            this.offsetRpcHook.throwException = false;
            RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, initTopic, "*", new RMQNormalListener());
            consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
            Assert.assertEquals(producer.getAllMsgBody().size(), consumer2.getListener().getAllMsgBody().size());
            Truth.assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer2.getListener().getAllMsgBody())).containsExactlyElementsIn(producer.getAllMsgBody());
            consumer2.shutdown();
        } catch (Throwable th) {
            this.offsetRpcHook.throwException = false;
            throw th;
        }
    }

    @Test
    public void testOffsetNotFoundException() {
        String initTopic = initTopic();
        String initConsumerGroup = initConsumerGroup();
        RMQNormalProducer producer = getProducer(NAMESRV_ADDR, initTopic);
        producer.send(10);
        Assert.assertEquals("Not all sent succeeded", 10, producer.getAllUndupMsgBody().size());
        try {
            this.offsetRpcHook.addSetZeroOfNotFound = true;
            RMQNormalConsumer rMQNormalConsumer = new RMQNormalConsumer(NAMESRV_ADDR, initTopic, "*", initConsumerGroup, new RMQNormalListener());
            rMQNormalConsumer.create(false);
            rMQNormalConsumer.getConsumer().setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            rMQNormalConsumer.start();
            rMQNormalConsumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 15000);
            Assert.assertEquals(producer.getAllMsgBody().size(), rMQNormalConsumer.getListener().getAllMsgBody().size());
            Truth.assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), rMQNormalConsumer.getListener().getAllMsgBody())).containsExactlyElementsIn(producer.getAllMsgBody());
            rMQNormalConsumer.shutdown();
            this.offsetRpcHook.addSetZeroOfNotFound = false;
        } catch (Throwable th) {
            this.offsetRpcHook.addSetZeroOfNotFound = false;
            throw th;
        }
    }
}
