package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.class */
public class StoreChangelogReaderTest {

    @Mock(type = MockType.NICE)
    private RestoringTasks active;

    @Mock(type = MockType.NICE)
    private StreamTask task;
    private final MockStateRestoreListener callback = new MockStateRestoreListener();
    private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(this.callback);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);
    private final LogContext logContext = new LogContext("test-reader ");
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.consumer, Duration.ZERO, this.stateRestoreListener, this.logContext);

    @Before
    public void setUp() {
        this.restoreListener.setUserRestoreListener(this.stateRestoreListener);
    }

    @Test
    public void shouldRequestTopicsAndHandleTimeoutException() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.1
            public Map<String, List<PartitionInfo>> listTopics() {
                atomicBoolean.set(true);
                throw new TimeoutException("KABOOM!");
            }
        }, Duration.ZERO, this.stateRestoreListener, this.logContext);
        storeChangelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        storeChangelogReader.restore(this.active);
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
        StateRestorer stateRestorer = (StateRestorer) EasyMock.mock(StateRestorer.class);
        stateRestorer.setUserRestoreListener(this.stateRestoreListener);
        EasyMock.expect(stateRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
        EasyMock.replay(new Object[]{stateRestorer});
        this.changelogReader.register(stateRestorer);
        this.consumer.subscribe(Collections.singleton("sometopic"));
        try {
            this.changelogReader.restore(this.active);
            Assert.fail("Should have thrown IllegalStateException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
        setupConsumer(10L, this.topicPartition);
        this.consumer.setException(new InvalidOffsetException("Try Again!") { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.2
            public Set<TopicPartition> partitions() {
                return Collections.singleton(StoreChangelogReaderTest.this.topicPartition);
            }
        });
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        Assert.assertEquals(0L, this.changelogReader.restore(this.active).size());
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        Assert.assertEquals(1L, this.changelogReader.restore(this.active).size());
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    @Test
    public void shouldRestoreMessagesFromCheckpoint() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, 5L, Long.MAX_VALUE, true, "storeName"));
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(5));
    }

    @Test
    public void shouldClearAssignmentAtEndOfRestore() {
        setupConsumer(1L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(this.consumer.assignment(), IsEqual.equalTo(Collections.emptySet()));
    }

    @Test
    public void shouldRestoreToLimitWhenSupplied() {
        setupConsumer(10L, this.topicPartition);
        StateRestorer stateRestorer = new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 3L, true, "storeName");
        this.changelogReader.register(stateRestorer);
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(3));
        MatcherAssert.assertThat(stateRestorer.restoredOffset(), IsEqual.equalTo(3L));
    }

    @Test
    public void shouldRestoreMultipleStores() {
        TopicPartition topicPartition = new TopicPartition("one", 0);
        TopicPartition topicPartition2 = new TopicPartition("two", 0);
        MockRestoreCallback mockRestoreCallback = new MockRestoreCallback();
        MockRestoreCallback mockRestoreCallback2 = new MockRestoreCallback();
        CompositeRestoreListener compositeRestoreListener = new CompositeRestoreListener(mockRestoreCallback);
        CompositeRestoreListener compositeRestoreListener2 = new CompositeRestoreListener(mockRestoreCallback2);
        setupConsumer(10L, this.topicPartition);
        setupConsumer(5L, topicPartition);
        setupConsumer(3L, topicPartition2);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName1"));
        this.changelogReader.register(new StateRestorer(topicPartition, compositeRestoreListener, (Long) null, Long.MAX_VALUE, true, "storeName2"));
        this.changelogReader.register(new StateRestorer(topicPartition2, compositeRestoreListener2, (Long) null, Long.MAX_VALUE, true, "storeName3"));
        EasyMock.expect(this.active.restoringTaskFor(topicPartition)).andStubReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(topicPartition2)).andStubReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
        MatcherAssert.assertThat(Integer.valueOf(mockRestoreCallback.restored.size()), IsEqual.equalTo(5));
        MatcherAssert.assertThat(Integer.valueOf(mockRestoreCallback2.restored.size()), IsEqual.equalTo(3));
    }

    @Test
    public void shouldRestoreAndNotifyMultipleStores() throws Exception {
        TopicPartition topicPartition = new TopicPartition("one", 0);
        TopicPartition topicPartition2 = new TopicPartition("two", 0);
        MockStateRestoreListener mockStateRestoreListener = new MockStateRestoreListener();
        MockStateRestoreListener mockStateRestoreListener2 = new MockStateRestoreListener();
        CompositeRestoreListener compositeRestoreListener = new CompositeRestoreListener(mockStateRestoreListener);
        CompositeRestoreListener compositeRestoreListener2 = new CompositeRestoreListener(mockStateRestoreListener2);
        setupConsumer(10L, this.topicPartition);
        setupConsumer(5L, topicPartition);
        setupConsumer(3L, topicPartition2);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, 0L, Long.MAX_VALUE, true, "storeName1"));
        this.changelogReader.register(new StateRestorer(topicPartition, compositeRestoreListener, 0L, Long.MAX_VALUE, true, "storeName2"));
        this.changelogReader.register(new StateRestorer(topicPartition2, compositeRestoreListener2, 0L, Long.MAX_VALUE, true, "storeName3"));
        EasyMock.expect(this.active.restoringTaskFor(topicPartition)).andReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(topicPartition2)).andReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
        MatcherAssert.assertThat(Integer.valueOf(mockStateRestoreListener.restored.size()), IsEqual.equalTo(5));
        MatcherAssert.assertThat(Integer.valueOf(mockStateRestoreListener2.restored.size()), IsEqual.equalTo(3));
        assertAllCallbackStatesExecuted(this.callback, "storeName1");
        assertCorrectOffsetsReportedByListener(this.callback, 0L, 9L, 10L);
        assertAllCallbackStatesExecuted(mockStateRestoreListener, "storeName2");
        assertCorrectOffsetsReportedByListener(mockStateRestoreListener, 0L, 4L, 5L);
        assertAllCallbackStatesExecuted(mockStateRestoreListener2, "storeName3");
        assertCorrectOffsetsReportedByListener(mockStateRestoreListener2, 0L, 2L, 3L);
    }

    @Test
    public void shouldOnlyReportTheLastRestoredOffset() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, 0L, 5L, true, "storeName1"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(5));
        assertAllCallbackStatesExecuted(this.callback, "storeName1");
        assertCorrectOffsetsReportedByListener(this.callback, 0L, 4L, 5L);
    }

    private void assertAllCallbackStatesExecuted(MockStateRestoreListener mockStateRestoreListener, String str) {
        MatcherAssert.assertThat(mockStateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START), IsEqual.equalTo(str));
        MatcherAssert.assertThat(mockStateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH), IsEqual.equalTo(str));
        MatcherAssert.assertThat(mockStateRestoreListener.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END), IsEqual.equalTo(str));
    }

    private void assertCorrectOffsetsReportedByListener(MockStateRestoreListener mockStateRestoreListener, long j, long j2, long j3) {
        MatcherAssert.assertThat(Long.valueOf(mockStateRestoreListener.restoreStartOffset), IsEqual.equalTo(Long.valueOf(j)));
        MatcherAssert.assertThat(Long.valueOf(mockStateRestoreListener.restoredBatchOffset), IsEqual.equalTo(Long.valueOf(j2)));
        MatcherAssert.assertThat(Long.valueOf(mockStateRestoreListener.totalNumRestored), IsEqual.equalTo(Long.valueOf(j3)));
    }

    @Test
    public void shouldNotRestoreAnythingWhenPartitionIsEmpty() {
        StateRestorer stateRestorer = new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName");
        setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(stateRestorer);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(0));
        MatcherAssert.assertThat(stateRestorer.restoredOffset(), IsEqual.equalTo(0L));
    }

    @Test
    public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
        Long l = 10L;
        setupConsumer(l.longValue(), this.topicPartition);
        StateRestorer stateRestorer = new StateRestorer(this.topicPartition, this.restoreListener, l, Long.MAX_VALUE, true, "storeName");
        this.changelogReader.register(stateRestorer);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(0));
        MatcherAssert.assertThat(stateRestorer.restoredOffset(), IsEqual.equalTo(l));
    }

    @Test
    public void shouldReturnRestoredOffsetsForPersistentStores() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(this.changelogReader.restoredOffsets(), IsEqual.equalTo(Collections.singletonMap(this.topicPartition, 10L)));
    }

    @Test
    public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, false, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(this.changelogReader.restoredOffsets(), IsEqual.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldIgnoreNullKeysWhenRestoring() {
        assignPartition(3L, this.topicPartition);
        byte[] bArr = new byte[0];
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 0L, bArr, bArr));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 1L, (byte[]) null, bArr));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 2L, bArr, bArr));
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, false, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(this.callback.restored, CoreMatchers.equalTo(Utils.mkList(new KeyValue[]{KeyValue.pair(bArr, bArr), KeyValue.pair(bArr, bArr)})));
    }

    @Test
    public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
        Set singleton = Collections.singleton(this.topicPartition);
        setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "store"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        MatcherAssert.assertThat(this.changelogReader.restore(this.active), IsEqual.equalTo(singleton));
    }

    @Test
    public void shouldRestorePartitionsRegisteredPostInitialization() {
        MockRestoreCallback mockRestoreCallback = new MockRestoreCallback();
        CompositeRestoreListener compositeRestoreListener = new CompositeRestoreListener(mockRestoreCallback);
        setupConsumer(1L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 10L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, false, "storeName"));
        TopicPartition topicPartition = new TopicPartition("other", 0);
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andStubReturn(this.task);
        EasyMock.expect(this.active.restoringTaskFor(topicPartition)).andStubReturn(this.task);
        EasyMock.replay(new Object[]{this.active, this.task});
        Assert.assertTrue(this.changelogReader.restore(this.active).isEmpty());
        addRecords(9L, this.topicPartition, 1);
        setupConsumer(3L, topicPartition);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 3L));
        this.changelogReader.register(new StateRestorer(topicPartition, compositeRestoreListener, (Long) null, Long.MAX_VALUE, false, "otherStore"));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.topicPartition, topicPartition});
        this.consumer.assign(mkSet);
        MatcherAssert.assertThat(this.changelogReader.restore(this.active), IsEqual.equalTo(mkSet));
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
        MatcherAssert.assertThat(Integer.valueOf(mockRestoreCallback.restored.size()), IsEqual.equalTo(3));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
        setupConsumer(10L, this.topicPartition);
        this.consumer.addEndOffsets(Collections.singletonMap(this.topicPartition, 15L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 9L, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
        setupConsumer(10L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 11L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, Long.MAX_VALUE, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 5L, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(5));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic() {
        setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 10L, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled() {
        assignPartition(10L, this.topicPartition);
        addRecords(5L, this.topicPartition, 0);
        addRecords(5L, this.topicPartition, 6);
        this.consumer.assign(Collections.emptyList());
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 12L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 6L, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(5));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() {
        setupConsumer(10L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 11L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, (Long) null, 11L, true, "storeName"));
        EasyMock.expect(this.active.restoringTaskFor(this.topicPartition)).andReturn(this.task);
        EasyMock.replay(new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(Integer.valueOf(this.callback.restored.size()), IsEqual.equalTo(10));
    }

    private void setupConsumer(long j, TopicPartition topicPartition) {
        assignPartition(j, topicPartition);
        addRecords(j, topicPartition, 0);
        this.consumer.assign(Collections.emptyList());
    }

    private void addRecords(long j, TopicPartition topicPartition, int i) {
        for (int i2 = 0; i2 < j; i2++) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), i + i2, new byte[0], new byte[0]));
        }
    }

    private void assignPartition(long j, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Long.valueOf(Math.max(0L, j))));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }
}
