package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/storage/OffsetStorageWriterTest.class */
public class OffsetStorageWriterTest {
    private static final String NAMESPACE = "namespace";
    private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key");
    private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12);
    private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
    private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
    private static final Exception EXCEPTION = new RuntimeException("error");
    private final OffsetBackingStore store = (OffsetBackingStore) Mockito.mock(OffsetBackingStore.class);
    private final Converter keyConverter = (Converter) Mockito.mock(Converter.class);
    private final Converter valueConverter = (Converter) Mockito.mock(Converter.class);
    private OffsetStorageWriter writer;
    private ExecutorService service;

    @Before
    public void setup() {
        this.writer = new OffsetStorageWriter(this.store, NAMESPACE, this.keyConverter, this.valueConverter);
        this.service = Executors.newFixedThreadPool(1);
    }

    @After
    public void teardown() {
        this.service.shutdownNow();
    }

    @Test
    public void testWriteFlush() throws Exception {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
        this.writer.offset(OFFSET_KEY, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.doFlush(callback).get(1000L, TimeUnit.MILLISECONDS);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.isNull());
    }

    @Test
    public void testWriteNullValueFlush() throws Exception {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, false, null);
        this.writer.offset(OFFSET_KEY, (Map) null);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.doFlush(callback).get(1000L, TimeUnit.MILLISECONDS);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.isNull());
    }

    @Test
    public void testWriteNullKeyFlush() throws Exception {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
        this.writer.offset((Map) null, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.doFlush(callback).get(1000L, TimeUnit.MILLISECONDS);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.isNull());
    }

    @Test
    public void testNoOffsetsToFlush() throws InterruptedException, TimeoutException {
        Assert.assertFalse(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        Mockito.verifyNoInteractions(new Object[]{this.store});
    }

    @Test
    public void testFlushFailureReplacesOffsets() throws Exception {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
        this.writer.offset(OFFSET_KEY, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.doFlush(callback).get(1000L, TimeUnit.MILLISECONDS);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) ArgumentMatchers.eq(EXCEPTION), ArgumentMatchers.isNull());
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.doFlush(callback).get(1000L, TimeUnit.MILLISECONDS);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.isNull());
        Assert.assertFalse(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, countDownLatch);
        this.writer.offset(OFFSET_KEY, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        Assert.assertThrows(TimeoutException.class, () -> {
            this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS);
        });
        this.writer.doFlush(callback);
        Assert.assertThrows(TimeoutException.class, () -> {
            this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS);
        });
        countDownLatch.countDown();
        Assert.assertFalse(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testCancelBeforeAwaitFlush() throws InterruptedException, TimeoutException {
        this.writer.offset(OFFSET_KEY, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        this.writer.cancelFlush();
    }

    @Test
    public void testCancelAfterAwaitFlush() throws Exception {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, countDownLatch);
        this.writer.offset(OFFSET_KEY, OFFSET_VALUE);
        Assert.assertTrue(this.writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
        Future doFlush = this.writer.doFlush(callback);
        this.writer.cancelFlush();
        countDownLatch.countDown();
        doFlush.get(1000L, TimeUnit.MILLISECONDS);
    }

    private void expectStore(Map<String, Object> map, byte[] bArr, Map<String, Object> map2, byte[] bArr2, boolean z, CountDownLatch countDownLatch) {
        Mockito.when(this.keyConverter.fromConnectData(NAMESPACE, (Schema) null, Arrays.asList(NAMESPACE, map))).thenReturn(bArr);
        Mockito.when(this.valueConverter.fromConnectData(NAMESPACE, (Schema) null, map2)).thenReturn(bArr2);
        Mockito.when(this.store.set((Map) ArgumentMatchers.eq(Collections.singletonMap(bArr == null ? null : ByteBuffer.wrap(bArr), bArr2 == null ? null : ByteBuffer.wrap(bArr2))), (Callback) ArgumentCaptor.forClass(Callback.class).capture())).thenAnswer(invocationOnMock -> {
            Callback callback = (Callback) invocationOnMock.getArgument(1);
            return this.service.submit(() -> {
                if (countDownLatch != null) {
                    Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
                }
                if (z) {
                    callback.onCompletion(EXCEPTION, (Object) null);
                    return null;
                }
                callback.onCompletion((Throwable) null, (Object) null);
                return null;
            });
        });
    }
}
