package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.class */
public class AsyncSinkWriterTest {
    private final List<Integer> res = new ArrayList();
    private TestSinkInitContext sinkInitContext;
    private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox;

    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest$AsyncSinkReleaseAndBlockWriterImpl.class */
    private class AsyncSinkReleaseAndBlockWriterImpl extends AsyncSinkWriterImpl {
        private final CountDownLatch blockedThreadLatch;
        private final CountDownLatch delayedStartLatch;
        private final boolean blockForLimitedTime;

        public AsyncSinkReleaseAndBlockWriterImpl(Sink.InitContext initContext, int i, CountDownLatch countDownLatch, CountDownLatch countDownLatch2, boolean z) {
            super(initContext, 3, i, 20, 100L, 100L, 100L, false, 0);
            this.blockedThreadLatch = countDownLatch;
            this.delayedStartLatch = countDownLatch2;
            this.blockForLimitedTime = z;
        }

        @Override // org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTest.AsyncSinkWriterImpl
        protected void submitRequestEntries(List<Integer> list, Consumer<List<Integer>> consumer) {
            if (list.size() == 3) {
                try {
                    this.delayedStartLatch.countDown();
                    if (this.blockForLimitedTime) {
                        ((AbstractBooleanAssert) Assertions.assertThat(this.blockedThreadLatch.await(500L, TimeUnit.MILLISECONDS)).as("The countdown latch was released before the full amountof time was reached.", new Object[0])).isFalse();
                    } else {
                        this.blockedThreadLatch.await();
                    }
                } catch (InterruptedException e) {
                    Assertions.fail("The unit test latch must not have been interrupted by another thread.");
                }
            }
            AsyncSinkWriterTest.this.res.addAll(list);
            consumer.accept(new ArrayList());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest$AsyncSinkWriterImpl.class */
    public class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
        private final Set<Integer> failedFirstAttempts;
        private final boolean simulateFailures;
        private final int delay;

        private AsyncSinkWriterImpl(AsyncSinkWriterTest asyncSinkWriterTest, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, int i4) {
            this(initContext, i, i2, i3, j, j2, j3, z, i4, (List<BufferedRequestState<Integer>>) Collections.emptyList());
        }

        private AsyncSinkWriterImpl(Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, int i4, List<BufferedRequestState<Integer>> list) {
            super((str, context) -> {
                return Integer.valueOf(Integer.parseInt(str));
            }, initContext, i, i2, i3, j, j2, j3, list);
            this.failedFirstAttempts = new HashSet();
            this.simulateFailures = z;
            this.delay = i4;
        }

        public void write(String str) throws IOException, InterruptedException {
            yieldMailbox(AsyncSinkWriterTest.this.sinkInitContext.getMailboxExecutor());
            yieldMailbox(AsyncSinkWriterTest.this.sinkInitContextAnyThreadMailbox.getMailboxExecutor());
            write(str, null);
        }

        public void yieldMailbox(MailboxExecutor mailboxExecutor) {
            boolean z = true;
            while (z) {
                z = mailboxExecutor.tryYield();
            }
        }

        public void writeAsNonMailboxThread(String str) throws IOException, InterruptedException {
            write(str, null);
        }

        protected void submitRequestEntries(List<Integer> list, Consumer<List<Integer>> consumer) {
            maybeDelay();
            if (list.stream().anyMatch(num -> {
                return num.intValue() > 100 && num.intValue() <= 200;
            })) {
                throw new RuntimeException("Deliberate runtime exception occurred in SinkWriterImplementation.");
            }
            if (!this.simulateFailures) {
                AsyncSinkWriterTest.this.res.addAll(list);
                consumer.accept(new ArrayList());
                return;
            }
            Stream<Integer> stream = this.failedFirstAttempts.stream();
            list.getClass();
            List list2 = (List) stream.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toList());
            Set<Integer> set = this.failedFirstAttempts;
            list2.getClass();
            set.removeIf((v1) -> {
                return r1.contains(v1);
            });
            List<Integer> list3 = (List) list.stream().filter(num2 -> {
                return !list2.contains(num2);
            }).filter(num3 -> {
                return num3.intValue() > 200;
            }).collect(Collectors.toList());
            this.failedFirstAttempts.addAll(list3);
            list.removeAll(list3);
            AsyncSinkWriterTest.this.res.addAll(list);
            consumer.accept(list3);
        }

        private void maybeDelay() {
            if (this.delay <= 0) {
                return;
            }
            try {
                Thread.sleep(this.delay);
            } catch (InterruptedException e) {
                Assertions.fail("Thread sleeping for delay in submitRequestEntries was interrupted.");
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public long getSizeInBytes(Integer num) {
            return (num.intValue() <= 200 || !this.simulateFailures) ? 4L : 100L;
        }

        public BufferedRequestState<Integer> wrapRequests(Integer... numArr) {
            return wrapRequests(Arrays.asList(numArr));
        }

        public BufferedRequestState<Integer> wrapRequests(List<Integer> list) {
            ArrayList arrayList = new ArrayList();
            for (Integer num : list) {
                arrayList.add(new RequestEntryWrapper(num, getSizeInBytes(num)));
            }
            return new BufferedRequestState<>(arrayList);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -831563592:
                    if (implMethodName.equals("lambda$new$ee82ea2f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/sink/writer/ElementConverter") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest$AsyncSinkWriterImpl") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Integer;")) {
                        return (str, context) -> {
                            return Integer.valueOf(Integer.parseInt(str));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest$AsyncSinkWriterImplBuilder.class */
    public class AsyncSinkWriterImplBuilder {
        private boolean simulateFailures;
        private int delay;
        private Sink.InitContext context;
        private int maxBatchSize;
        private int maxInFlightRequests;
        private int maxBufferedRequests;
        private long maxBatchSizeInBytes;
        private long maxTimeInBufferMS;
        private long maxRecordSizeInBytes;

        private AsyncSinkWriterImplBuilder() {
            this.simulateFailures = false;
            this.delay = 0;
            this.maxBatchSize = 10;
            this.maxInFlightRequests = 1;
            this.maxBufferedRequests = 100;
            this.maxBatchSizeInBytes = 110L;
            this.maxTimeInBufferMS = 1000L;
            this.maxRecordSizeInBytes = this.maxBatchSizeInBytes;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder context(Sink.InitContext initContext) {
            this.context = initContext;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxBatchSize(int i) {
            this.maxBatchSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxInFlightRequests(int i) {
            this.maxInFlightRequests = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxBufferedRequests(int i) {
            this.maxBufferedRequests = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxBatchSizeInBytes(long j) {
            this.maxBatchSizeInBytes = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxTimeInBufferMS(long j) {
            this.maxTimeInBufferMS = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder maxRecordSizeInBytes(long j) {
            this.maxRecordSizeInBytes = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder delay(int i) {
            this.delay = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImplBuilder simulateFailures(boolean z) {
            this.simulateFailures = z;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImpl build() {
            return new AsyncSinkWriterImpl(this.context, this.maxBatchSize, this.maxInFlightRequests, this.maxBufferedRequests, this.maxBatchSizeInBytes, this.maxTimeInBufferMS, this.maxRecordSizeInBytes, this.simulateFailures, this.delay);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AsyncSinkWriterImpl buildWithState(List<BufferedRequestState<Integer>> list) {
            return new AsyncSinkWriterImpl(this.context, this.maxBatchSize, this.maxInFlightRequests, this.maxBufferedRequests, this.maxBatchSizeInBytes, this.maxTimeInBufferMS, this.maxRecordSizeInBytes, this.simulateFailures, this.delay, list);
        }
    }

    @BeforeEach
    public void before() {
        this.res.clear();
        this.sinkInitContext = new TestSinkInitContext();
        this.sinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
    }

    private void performNormalWriteOfEightyRecordsToMock() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 80; i++) {
            build.write(String.valueOf(i));
        }
    }

    @Test
    public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten() throws IOException, InterruptedException {
        performNormalWriteOfEightyRecordsToMock();
        Assertions.assertThat(this.res.size()).isEqualTo(80);
    }

    @Test
    public void testMetricsGroupHasLoggedNumberOfRecordsAndNumberOfBytesCorrectly() throws IOException, InterruptedException {
        performNormalWriteOfEightyRecordsToMock();
        Assertions.assertThat(this.sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(80L);
        Assertions.assertThat(this.sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(320L);
        Assertions.assertThat((Long) this.sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isGreaterThanOrEqualTo(0L);
        Assertions.assertThat((Long) this.sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(1000L);
    }

    @Test
    public void checkLoggedSendTimesAreWithinBounds() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(2).delay(100).build();
        for (int i = 0; i < 4; i++) {
            build.write(String.valueOf(i));
        }
        Assertions.assertThat((Long) this.sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isGreaterThanOrEqualTo(99L);
        Assertions.assertThat((Long) this.sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(110L);
    }

    @Test
    public void testThatUnwrittenRecordsInBufferArePersistedWhenSnapshotIsTaken() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 23; i++) {
            build.write(String.valueOf(i));
        }
        Assertions.assertThat(this.res.size()).isEqualTo(20);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(build.wrapRequests(20, 21, 22), getWriterState(build));
    }

    @Test
    public void sinkToAllowBatchSizesEqualToByteWiseLimit() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(12L).maxRecordSizeInBytes(4L).build();
        build.write("1");
        build.write("2");
        build.write("3");
        Assertions.assertThat(this.res.size()).isEqualTo(3);
    }

    @Test
    public void testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 23; i++) {
            build.write(String.valueOf(i));
        }
        build.flush(true);
        Assertions.assertThat(this.res.size()).isEqualTo(23);
    }

    @Test
    public void testThatMailboxYieldDoesNotBlockWhileATimerIsRegisteredAndHasYetToElapse() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        build.write(String.valueOf(0));
        build.flush(true);
        Assertions.assertThat(this.res.size()).isEqualTo(1);
    }

    @Test
    public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).build();
        build.write("25");
        build.write("55");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(build.wrapRequests(25, 55), getWriterState(build));
        Assertions.assertThat(this.res.size()).isEqualTo(0);
        build.write("75");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(build));
        Assertions.assertThat(this.res.size()).isEqualTo(3);
    }

    public void writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).build();
        build.write("25");
        build.write("55");
        build.write("75");
        build.write("95");
        build.write("955");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(build.wrapRequests(95, 955), getWriterState(build));
        build.flush(true);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(build));
    }

    @Test
    public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush() throws IOException, InterruptedException {
        writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
        Assertions.assertThat(this.res.size()).isEqualTo(5);
    }

    @Test
    public void metricsAreLoggedEachTimeSubmitRequestEntriesIsCalled() throws IOException, InterruptedException {
        writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
        Assertions.assertThat(this.sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(5L);
        Assertions.assertThat(this.sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(20L);
    }

    @Test
    public void testRuntimeErrorsInSubmitRequestEntriesEndUpAsIOExceptionsWithNumOfFailedRequests() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).simulateFailures(true).build();
        build.write("25");
        build.write("55");
        build.write("75");
        build.write("95");
        build.write("35");
        Assertions.assertThatThrownBy(() -> {
            build.write("135");
        }).isInstanceOf(RuntimeException.class).hasMessage("Deliberate runtime exception occurred in SinkWriterImplementation.");
        Assertions.assertThat(this.res.size()).isEqualTo(3);
    }

    @Test
    public void testRetryableErrorsDoNotViolateAtLeastOnceSemanticsDueToRequeueOfFailures() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxBatchSizeInBytes(10000000L).simulateFailures(true).build();
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "25", Arrays.asList(new Integer[0]), Arrays.asList(25));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "55", Arrays.asList(new Integer[0]), Arrays.asList(25, 55));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "965", Arrays.asList(25, 55), Arrays.asList(new Integer[0]));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "75", Arrays.asList(25, 55, 965, 75), Arrays.asList(new Integer[0]));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "95", Arrays.asList(25, 55, 965, 75), Arrays.asList(95));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "955", Arrays.asList(25, 55, 965, 75), Arrays.asList(95, 955));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(new Integer[0]));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "45", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45, 35));
        writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(build, "535", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35), Arrays.asList(new Integer[0]));
        build.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535));
        Assertions.assertThat(getWriterState(build).getStateSize()).isEqualTo(0L);
    }

    @Test
    public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfPrepareCommitIsTriggered() throws IOException, InterruptedException {
        testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).simulateFailures(true).build());
    }

    @Test
    public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfBufferFillsToFull() throws IOException, InterruptedException {
        testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxInFlightRequests(1).maxBufferedRequests(8).simulateFailures(true).build());
    }

    private void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(AsyncSinkWriterImpl asyncSinkWriterImpl) throws IOException, InterruptedException {
        asyncSinkWriterImpl.write("25");
        asyncSinkWriterImpl.write("55");
        asyncSinkWriterImpl.write("965");
        asyncSinkWriterImpl.write("75");
        asyncSinkWriterImpl.write("95");
        asyncSinkWriterImpl.write("955");
        asyncSinkWriterImpl.write("550");
        asyncSinkWriterImpl.write("645");
        asyncSinkWriterImpl.write("545");
        asyncSinkWriterImpl.write("535");
        asyncSinkWriterImpl.write("515");
        asyncSinkWriterImpl.write("505");
        asyncSinkWriterImpl.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505));
    }

    @Test
    public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
        Assertions.assertThatThrownBy(() -> {
            new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBufferedRequests(10).build();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
    }

    @Test
    public void maxRecordSizeSetMustBeSmallerThanOrEqualToMaxBatchSize() {
        Assertions.assertThatThrownBy(() -> {
            new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBufferedRequests(11).maxBatchSizeInBytes(10000L).maxRecordSizeInBytes(10001L).build();
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
    }

    @Test
    public void recordsWrittenToTheSinkMustBeSmallerOrEqualToMaxRecordSizeInBytes() {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxBufferedRequests(11).maxBatchSizeInBytes(10000L).maxRecordSizeInBytes(3L).build();
        Assertions.assertThatThrownBy(() -> {
            build.write("3");
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("The request entry sent to the buffer was of size [4], when the maxRecordSizeInBytes was set to [3].");
    }

    private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(AsyncSinkWriterImpl asyncSinkWriterImpl, String str, List<Integer> list, List<Integer> list2) throws IOException, InterruptedException {
        asyncSinkWriterImpl.write(str);
        Assertions.assertThat(this.res).isEqualTo(list);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(asyncSinkWriterImpl.wrapRequests(list2), getWriterState(asyncSinkWriterImpl));
    }

    @Test
    public void testFlushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(30L).maxRecordSizeInBytes(30L).build();
        for (int i = 0; i < 100; i++) {
            build.write(String.valueOf(i));
            Assertions.assertThat(this.res.size()).isEqualTo((i / 7) * 7);
        }
    }

    @Test
    public void prepareCommitDoesNotFlushElementsIfFlushIsSetToFalse() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        build.write(String.valueOf(0));
        build.write(String.valueOf(1));
        build.write(String.valueOf(2));
        build.flush(false);
        Assertions.assertThat(this.res.size()).isEqualTo(0);
    }

    @Test
    public void testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(7).maxBatchSizeInBytes(32L).maxRecordSizeInBytes(32L).build();
        for (int i = 0; i < 7; i++) {
            build.write(String.valueOf(i));
        }
        Assertions.assertThat(this.res.size()).isEqualTo(7);
        for (int i2 = 7; i2 < 14; i2++) {
            build.write(String.valueOf(i2));
        }
        Assertions.assertThat(this.res.size()).isEqualTo(14);
    }

    @Test
    public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectSize() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        build.write(String.valueOf(225));
        build.write(String.valueOf(1));
        build.write(String.valueOf(2));
        build.write(String.valueOf(3));
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        build.write(String.valueOf(4));
        build.write(String.valueOf(5));
        build.write(String.valueOf(6));
        build.write(String.valueOf(325));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
    }

    @Test
    public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectOrder() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(210L).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        build.write(String.valueOf(228));
        build.write(String.valueOf(225));
        build.write(String.valueOf(1));
        build.write(String.valueOf(2));
        build.write(String.valueOf(3));
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        build.write(String.valueOf(4));
        build.write(String.valueOf(5));
        build.write(String.valueOf(6));
        build.write(String.valueOf(328));
        build.write(String.valueOf(325));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 228, 225, 3, 4));
    }

    @Test
    public void testThatABatchWithSizeSmallerThanMaxBatchSizeIsFlushedOnTimeoutExpiry() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        for (int i = 0; i < 8; i++) {
            build.write(String.valueOf(i));
        }
        testProcessingTimeService.setCurrentTime(99L);
        Assertions.assertThat(this.res.size()).isEqualTo(0);
        testProcessingTimeService.setCurrentTime(100L);
        Assertions.assertThat(this.res.size()).isEqualTo(8);
    }

    @Test
    public void testThatTimeBasedBatchPicksUpAllRelevantItemsUpUntilExpiryOfTimer() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        for (int i = 0; i < 98; i++) {
            testProcessingTimeService.setCurrentTime(i);
            build.write(String.valueOf(i));
        }
        testProcessingTimeService.setCurrentTime(99L);
        Assertions.assertThat(this.res.size()).isEqualTo(90);
        testProcessingTimeService.setCurrentTime(100L);
        Assertions.assertThat(this.res.size()).isEqualTo(98);
    }

    @Test
    public void prepareCommitFlushesInflightElementsAndDoesNotFlushIfFlushIsSetToFalse() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(8).maxBufferedRequests(10).simulateFailures(true).build();
        build.write(String.valueOf(225));
        build.write(String.valueOf(0));
        build.write(String.valueOf(1));
        build.write(String.valueOf(2));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1));
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(build.wrapRequests(2), getWriterState(build));
        build.flush(false);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1));
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(build.wrapRequests(225, 2), getWriterState(build));
        build.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1, 225, 2));
    }

    @Test
    public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferAfterSnapshot() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        build.write(String.valueOf(225));
        build.write(String.valueOf(1));
        build.write(String.valueOf(2));
        build.write(String.valueOf(3));
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        build.flush(false);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        AsyncSinkWriterImpl buildWithState = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).buildWithState(build.snapshotState(1L));
        buildWithState.write(String.valueOf(4));
        buildWithState.write(String.valueOf(5));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
    }

    @Test
    public void testThatRecordOfSizeBiggerThanMaximumFailsSinkInitialization() throws IOException, InterruptedException {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        build.write(String.valueOf(225));
        build.flush(false);
        List snapshotState = build.snapshotState(1L);
        Assertions.assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
            new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(15L).buildWithState(snapshotState);
        }).withMessageContaining("State contains record of size 100 which exceeds sink maximum record size 15.");
    }

    @Test
    public void testRestoreFromMultipleStates() throws IOException {
        List snapshotState = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).buildWithState(Arrays.asList(new BufferedRequestState(Arrays.asList(new RequestEntryWrapper(1, 1L), new RequestEntryWrapper(2, 1L), new RequestEntryWrapper(3, 1L))), new BufferedRequestState(Arrays.asList(new RequestEntryWrapper(4, 1L), new RequestEntryWrapper(5, 1L))), new BufferedRequestState(Collections.singletonList(new RequestEntryWrapper(6, 1L))))).snapshotState(1L);
        Assertions.assertThat(snapshotState).hasSize(1);
        BufferedRequestState bufferedRequestState = (BufferedRequestState) snapshotState.get(0);
        Assertions.assertThat(bufferedRequestState.getBufferedRequestEntries()).hasSize(6);
        Assertions.assertThat(bufferedRequestState.getStateSize()).isEqualTo(6L);
        Assertions.assertThat((List) bufferedRequestState.getBufferedRequestEntries().stream().map((v0) -> {
            return v0.getRequestEntry();
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new Integer[]{1, 2, 3, 4, 5, 6});
    }

    @Test
    public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        build.write("1");
        Assertions.assertThat(this.res.size()).isEqualTo(0);
        testProcessingTimeService.setCurrentTime(10L);
        build.flush(true);
        Assertions.assertThat(this.res.size()).isEqualTo(1);
        testProcessingTimeService.setCurrentTime(20L);
        build.write("2");
        Assertions.assertThat(this.res.size()).isEqualTo(1);
        testProcessingTimeService.setCurrentTime(100L);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        build.write("3");
        testProcessingTimeService.setCurrentTime(199L);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        testProcessingTimeService.setCurrentTime(200L);
        Assertions.assertThat(this.res.size()).isEqualTo(3);
    }

    @Test
    public void testThatIntermittentlyFailingEntriesShouldBeFlushedWithMainBatchInTimeBasedFlush() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        build.write("1");
        build.write("2");
        build.write("225");
        testProcessingTimeService.setCurrentTime(100L);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        build.write("3");
        build.write("4");
        testProcessingTimeService.setCurrentTime(199L);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
        testProcessingTimeService.setCurrentTime(200L);
        Assertions.assertThat(this.res.size()).isEqualTo(5);
    }

    @Test
    public void testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        build.write("1");
        testProcessingTimeService.setCurrentTime(50L);
        build.flush(true);
        Assertions.assertThat(this.res.size()).isEqualTo(1);
        testProcessingTimeService.setCurrentTime(200L);
    }

    @Test
    public void testThatOnExpiryOfAnOldTimeoutANewOneMayBeRegisteredImmediately() throws Exception {
        AsyncSinkWriterImpl build = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        build.write("1");
        testProcessingTimeService.setCurrentTime(100L);
        Assertions.assertThat(this.res.size()).isEqualTo(1);
        build.write("2");
        testProcessingTimeService.setCurrentTime(200L);
        Assertions.assertThat(this.res.size()).isEqualTo(2);
    }

    @Test
    public void testThatInterleavingThreadsMayBlockEachOtherButDoNotCauseRaceConditions() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        writeTwoElementsAndInterleaveTheNextTwoElements(new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 1, countDownLatch, countDownLatch2, true), countDownLatch, countDownLatch2);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 3, 4));
    }

    @Test
    public void testThatIfOneInterleavedThreadIsBlockedTheOtherThreadWillContinueAndCorrectlyWrite() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        writeTwoElementsAndInterleaveTheNextTwoElements(new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 2, countDownLatch, countDownLatch2, false), countDownLatch, countDownLatch2);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(4, 1, 2, 3));
    }

    private void writeTwoElementsAndInterleaveTheNextTwoElements(AsyncSinkWriterImpl asyncSinkWriterImpl, CountDownLatch countDownLatch, CountDownLatch countDownLatch2) throws Exception {
        TestProcessingTimeService testProcessingTimeService = this.sinkInitContext.getTestProcessingTimeService();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        testProcessingTimeService.setCurrentTime(0L);
        asyncSinkWriterImpl.write("1");
        asyncSinkWriterImpl.write("2");
        newFixedThreadPool.submit(() -> {
            try {
                asyncSinkWriterImpl.writeAsNonMailboxThread("3");
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        });
        countDownLatch2.await();
        asyncSinkWriterImpl.write("4");
        testProcessingTimeService.setCurrentTime(100L);
        countDownLatch.countDown();
        newFixedThreadPool.shutdown();
        ((AbstractBooleanAssert) Assertions.assertThat(newFixedThreadPool.awaitTermination(500L, TimeUnit.MILLISECONDS)).as("Executor Service stuck at termination, not terminated after 500ms!", new Object[0])).isTrue();
    }

    @Test
    public void ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AsyncSinkReleaseAndBlockWriterImpl asyncSinkReleaseAndBlockWriterImpl = new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 1, countDownLatch, countDownLatch2, false);
        Thread thread = new Thread(() -> {
            try {
                asyncSinkReleaseAndBlockWriterImpl.writeAsNonMailboxThread("1");
                asyncSinkReleaseAndBlockWriterImpl.writeAsNonMailboxThread("2");
                asyncSinkReleaseAndBlockWriterImpl.writeAsNonMailboxThread("3");
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
                Assertions.fail("Auxiliary thread encountered an exception when writing to the sink", e);
            }
        });
        thread.start();
        countDownLatch2.await();
        Thread thread2 = new Thread(() -> {
            try {
                asyncSinkReleaseAndBlockWriterImpl.flush(true);
                Assertions.fail("Sink did not block successfully and reached here when it shouldn't have.");
            } catch (InterruptedException e) {
            }
        });
        Thread.sleep(300L);
        Assertions.assertThat(thread2.isInterrupted()).isFalse();
        thread2.interrupt();
        countDownLatch.countDown();
        thread.join();
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 3));
    }

    private BufferedRequestState<Integer> getWriterState(AsyncSinkWriter<String, Integer> asyncSinkWriter) {
        List snapshotState = asyncSinkWriter.snapshotState(1L);
        Assertions.assertThat(snapshotState.size()).isEqualTo(1);
        return (BufferedRequestState) snapshotState.get(0);
    }
}
