package io.confluent.parallelconsumer;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.awaitility.Awaitility;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessorPauseResumeTest.class */
public class ParallelEoSStreamProcessorPauseResumeTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessorPauseResumeTest.class);
    private static final AtomicInteger MY_ID_GENERATOR = new AtomicInteger();
    private static final AtomicInteger RECORD_SET_KEY_GENERATOR = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessorPauseResumeTest$TestUserFunction.class */
    public static class TestUserFunction implements Consumer<PollContext<String, String>> {
        private final AtomicInteger numProcessedRecords;
        private final AtomicInteger numInFlightRecords;
        private final ReentrantLock mutex;

        private TestUserFunction() {
            this.numProcessedRecords = new AtomicInteger();
            this.numInFlightRecords = new AtomicInteger();
            this.mutex = new ReentrantLock();
        }

        public void lockProcessing() {
            this.mutex.lock();
        }

        public void unlockProcessing() {
            ParallelEoSStreamProcessorPauseResumeTest.log.debug("Unlocking processing");
            this.mutex.unlock();
        }

        @Override // java.util.function.Consumer
        public void accept(PollContext<String, String> pollContext) {
            ParallelEoSStreamProcessorPauseResumeTest.log.debug("Received: {}", pollContext);
            this.numInFlightRecords.incrementAndGet();
            try {
                lockProcessing();
                ParallelEoSStreamProcessorPauseResumeTest.log.debug("Processed complete, incremented to {}", Integer.valueOf(this.numProcessedRecords.incrementAndGet()));
            } finally {
                unlockProcessing();
                this.numInFlightRecords.decrementAndGet();
            }
        }

        public void reset() {
            this.numProcessedRecords.set(0);
        }
    }

    ParallelEoSStreamProcessorPauseResumeTest() {
    }

    private ParallelConsumerOptions<String, String> getBaseOptions(ParallelConsumerOptions.CommitMode commitMode, int i) {
        return ParallelConsumerOptions.builder().commitMode(commitMode).consumer(this.consumerSpy).ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).maxConcurrency(i).build();
    }

    private void addRecordsWithSetKey(int i) {
        long incrementAndGet = RECORD_SET_KEY_GENERATOR.incrementAndGet();
        log.debug("Producing {} records with set key {}.", Integer.valueOf(i), Long.valueOf(incrementAndGet));
        for (int i2 = 0; i2 < i; i2++) {
            this.consumerSpy.addRecord(this.ktu.makeRecord("key-" + incrementAndGet + i2, "v0-test-" + i2));
        }
        log.debug("Finished producing {} records with set key {}.", Integer.valueOf(i), Long.valueOf(incrementAndGet));
    }

    private void setupParallelConsumerInstance(ParallelConsumerOptions.CommitMode commitMode, int i) {
        setupParallelConsumerInstance(getBaseOptions(commitMode, i));
        this.parallelConsumer.setMyId(Optional.of("p/r-test-" + MY_ID_GENERATOR.incrementAndGet()));
    }

    private TestUserFunction createTestSetup(ParallelConsumerOptions.CommitMode commitMode, int i) {
        setupParallelConsumerInstance(commitMode, i);
        TestUserFunction testUserFunction = new TestUserFunction();
        this.parallelConsumer.poll(testUserFunction);
        return testUserFunction;
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void pausingAndResumingProcessingShouldWork(ParallelConsumerOptions.CommitMode commitMode) {
        int i = 1000;
        TestUserFunction createTestSetup = createTestSetup(commitMode, 3);
        addRecordsWithSetKey(1000);
        Awaitility.waitAtMost(defaultTimeout).alias("1000 records should be processed").untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(createTestSetup.numProcessedRecords.get())).isEqualTo(Integer.valueOf(i));
        });
        awaitForCommit(1000);
        createTestSetup.reset();
        this.parallelConsumer.pauseIfRunning();
        awaitForOneLoopCycle();
        addRecordsWithSetKey(1000);
        awaitForSomeLoopCycles(2);
        Truth.assertThat(Integer.valueOf(createTestSetup.numProcessedRecords.get())).isEqualTo(0L);
        awaitForCommit(1000);
        this.parallelConsumer.resumeIfPaused();
        Awaitility.waitAtMost(defaultTimeout).alias("1000 records should be processed").untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(createTestSetup.numProcessedRecords.get())).isEqualTo(Integer.valueOf(i));
        });
        awaitForCommit(2 * 1000);
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void testThatInFlightWorkIsFinishedSuccessfullyAndOffsetsAreCommitted(ParallelConsumerOptions.CommitMode commitMode) {
        int i = 3;
        int i2 = 1000;
        TestUserFunction createTestSetup = createTestSetup(commitMode, 3);
        createTestSetup.lockProcessing();
        addRecordsWithSetKey(1000);
        Awaitility.waitAtMost(defaultTimeout).alias("3 records should be in flight processed").untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(createTestSetup.numInFlightRecords.get())).isEqualTo(Integer.valueOf(i));
        });
        assertCommits().isEmpty();
        this.parallelConsumer.pauseIfRunning();
        awaitForOneLoopCycle();
        createTestSetup.unlockProcessing();
        Awaitility.waitAtMost(defaultTimeout).alias("at least 3 records should be processed").untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(createTestSetup.numProcessedRecords.get())).isGreaterThan(Integer.valueOf(i));
        });
        awaitForCommit(createTestSetup.numProcessedRecords.get());
        Truth.assertThat(Integer.valueOf(createTestSetup.numInFlightRecords.get())).isEqualTo(0);
        Truth.assertThat(Integer.valueOf(this.parallelConsumer.getWm().getNumberRecordsOutForProcessing())).isEqualTo(0);
        this.parallelConsumer.resumeIfPaused();
        Awaitility.waitAtMost(defaultTimeout).alias("1000 records should be processed").untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(createTestSetup.numProcessedRecords.get())).isEqualTo(Integer.valueOf(i2));
        });
        awaitForCommit(1000);
    }
}
