package org.apache.storm.kafka.spout;

import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.class */
public class KafkaSpoutRetryExponentialBackoffTest {
    private final TopicPartition testTopic = new TopicPartition("topic", 0);
    private final TopicPartition testTopic2 = new TopicPartition("other-topic", 0);

    private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L));
    }

    private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(1L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(1L));
    }

    @Test
    public void testCanScheduleRetry() {
        KafkaSpoutRetryExponentialBackoff createNoWaitRetryService = createNoWaitRetryService();
        KafkaSpoutMessageId messageId = createNoWaitRetryService.getMessageId(this.testTopic, 0L);
        messageId.incrementNumFails();
        MatcherAssert.assertThat("The service must schedule the message for retry", Boolean.valueOf(createNoWaitRetryService.schedule(messageId)), CoreMatchers.is(true));
        MatcherAssert.assertThat("The service should return the original message id when asked for the same tp/offset twice", createNoWaitRetryService.getMessageId(this.testTopic, 0L), CoreMatchers.sameInstance(messageId));
        MatcherAssert.assertThat(Boolean.valueOf(createNoWaitRetryService.isScheduled(messageId)), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(createNoWaitRetryService.isReady(messageId)), CoreMatchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(createNoWaitRetryService.readyMessageCount()), CoreMatchers.is(1));
        MatcherAssert.assertThat(createNoWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, Long.valueOf(messageId.offset()))));
    }

    @Test
    public void testCanRescheduleRetry() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService = createOneSecondWaitRetryService();
            KafkaSpoutMessageId messageId = createOneSecondWaitRetryService.getMessageId(this.testTopic, 0L);
            messageId.incrementNumFails();
            createOneSecondWaitRetryService.schedule(messageId);
            Time.advanceTime(500L);
            MatcherAssert.assertThat("The service must be able to reschedule an already scheduled id", Boolean.valueOf(createOneSecondWaitRetryService.schedule(messageId)), CoreMatchers.is(true));
            Time.advanceTime(500L);
            MatcherAssert.assertThat("The message should not be ready for retry yet since it was rescheduled", Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.emptyMap()));
            MatcherAssert.assertThat(Integer.valueOf(createOneSecondWaitRetryService.readyMessageCount()), CoreMatchers.is(0));
            Time.advanceTime(500L);
            MatcherAssert.assertThat("The message should be ready for retry once the full delay has passed", Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, Long.valueOf(messageId.offset()))));
            MatcherAssert.assertThat(Integer.valueOf(createOneSecondWaitRetryService.readyMessageCount()), CoreMatchers.is(1));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCannotContainMultipleSchedulesForId() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService = createOneSecondWaitRetryService();
            KafkaSpoutMessageId messageId = createOneSecondWaitRetryService.getMessageId(this.testTopic, 0L);
            messageId.incrementNumFails();
            createOneSecondWaitRetryService.schedule(messageId);
            Time.advanceTime(500L);
            createOneSecondWaitRetryService.schedule(messageId);
            createOneSecondWaitRetryService.remove(messageId);
            MatcherAssert.assertThat("The message should no longer be scheduled", Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(false));
            Time.advanceTime(500L);
            MatcherAssert.assertThat("The message should not be ready for retry because it isn't scheduled", Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(false));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCanRemoveRetry() {
        KafkaSpoutRetryExponentialBackoff createNoWaitRetryService = createNoWaitRetryService();
        KafkaSpoutMessageId messageId = createNoWaitRetryService.getMessageId(this.testTopic, 0L);
        messageId.incrementNumFails();
        createNoWaitRetryService.schedule(messageId);
        MatcherAssert.assertThat(Boolean.valueOf(createNoWaitRetryService.remove(messageId)), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(createNoWaitRetryService.isScheduled(messageId)), CoreMatchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(createNoWaitRetryService.isReady(messageId)), CoreMatchers.is(false));
        MatcherAssert.assertThat(createNoWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.emptyMap()));
        MatcherAssert.assertThat(Integer.valueOf(createNoWaitRetryService.readyMessageCount()), CoreMatchers.is(0));
    }

    @Test
    public void testCanHandleMultipleTopics() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService = createOneSecondWaitRetryService();
            KafkaSpoutMessageId messageId = createOneSecondWaitRetryService.getMessageId(this.testTopic, 0L);
            KafkaSpoutMessageId messageId2 = createOneSecondWaitRetryService.getMessageId(this.testTopic2, 0L);
            messageId.incrementNumFails();
            messageId2.incrementNumFails();
            boolean schedule = createOneSecondWaitRetryService.schedule(messageId);
            Time.advanceTime(500L);
            boolean schedule2 = createOneSecondWaitRetryService.schedule(messageId2);
            MatcherAssert.assertThat(Boolean.valueOf(schedule), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(schedule2), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId2)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(false));
            Time.advanceTime(500L);
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(false));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, 0L)));
            Time.advanceTime(500L);
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(true));
            HashMap hashMap = new HashMap();
            hashMap.put(this.testTopic, 0L);
            hashMap.put(this.testTopic2, 0L);
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(hashMap));
            createOneSecondWaitRetryService.retainAll(Collections.singleton(this.testTopic2));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId2)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(true));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic2, 0L)));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCanHandleMultipleMessagesOnPartition() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService = createOneSecondWaitRetryService();
            KafkaSpoutMessageId messageId = createOneSecondWaitRetryService.getMessageId(this.testTopic, 0L);
            KafkaSpoutMessageId messageId2 = createOneSecondWaitRetryService.getMessageId(this.testTopic, 0 + 1);
            messageId.incrementNumFails();
            messageId2.incrementNumFails();
            createOneSecondWaitRetryService.schedule(messageId);
            Time.advanceTime(500L);
            createOneSecondWaitRetryService.schedule(messageId2);
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isScheduled(messageId2)), CoreMatchers.is(true));
            Time.advanceTime(500L);
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(false));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, Long.valueOf(messageId.offset()))));
            Time.advanceTime(500L);
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(createOneSecondWaitRetryService.isReady(messageId2)), CoreMatchers.is(true));
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, Long.valueOf(messageId.offset()))));
            createOneSecondWaitRetryService.remove(messageId);
            MatcherAssert.assertThat(createOneSecondWaitRetryService.earliestRetriableOffsets(), CoreMatchers.is(Collections.singletonMap(this.testTopic, Long.valueOf(messageId2.offset()))));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMaxRetries() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff kafkaSpoutRetryExponentialBackoff = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), 3, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L));
            KafkaSpoutMessageId messageId = kafkaSpoutRetryExponentialBackoff.getMessageId(this.testTopic, 0L);
            for (int i = 0; i < 3; i++) {
                messageId.incrementNumFails();
            }
            MatcherAssert.assertThat(Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.schedule(messageId)), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.isScheduled(messageId)), CoreMatchers.is(true));
            kafkaSpoutRetryExponentialBackoff.remove(messageId);
            messageId.incrementNumFails();
            MatcherAssert.assertThat("The message should not be allowed to retry once the limit is reached", Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.schedule(messageId)), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.isScheduled(messageId)), CoreMatchers.is(false));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMaxDelay() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff kafkaSpoutRetryExponentialBackoff = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(500L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(2));
            KafkaSpoutMessageId messageId = kafkaSpoutRetryExponentialBackoff.getMessageId(this.testTopic, 0L);
            messageId.incrementNumFails();
            kafkaSpoutRetryExponentialBackoff.schedule(messageId);
            Time.advanceTimeSecs(2);
            MatcherAssert.assertThat("The message should be ready for retry after the max delay", Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.isReady(messageId)), CoreMatchers.is(true));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testExponentialBackoff() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpoutRetryExponentialBackoff kafkaSpoutRetryExponentialBackoff = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(4L), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(2147483647L));
            KafkaSpoutMessageId messageId = kafkaSpoutRetryExponentialBackoff.getMessageId(this.testTopic, 0L);
            messageId.incrementNumFails();
            messageId.incrementNumFails();
            for (Integer num : new Integer[]{8, 16, 32}) {
                kafkaSpoutRetryExponentialBackoff.schedule(messageId);
                Time.advanceTimeSecs(num.intValue() - 1);
                MatcherAssert.assertThat("The message should not be ready for retry until backoff " + num + " has expired", Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.isReady(messageId)), CoreMatchers.is(false));
                Time.advanceTimeSecs(1L);
                MatcherAssert.assertThat("The message should be ready for retry once backoff " + num + " has expired", Boolean.valueOf(kafkaSpoutRetryExponentialBackoff.isReady(messageId)), CoreMatchers.is(true));
                messageId.incrementNumFails();
                kafkaSpoutRetryExponentialBackoff.remove(messageId);
            }
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
