package com.google.pubsublite.kafka.sink;

import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/pubsublite/kafka/sink/PubSubLiteSinkTaskTest.class */
public class PubSubLiteSinkTaskTest {
    private PubSubLiteSinkTask task;

    @Spy
    private FakePublisher publisher;
    private static final ByteString KAFKA_MESSAGE1 = ByteString.copyFromUtf8("fox");
    private static final ByteString KAFKA_MESSAGE2 = ByteString.copyFromUtf8("jumps");
    private static final String KAFKA_TOPIC = "brown";
    private static final String KAFKA_MESSAGE_KEY1 = "dog";
    private static final SinkRecord SAMPLE_RECORD_1 = new SinkRecord(KAFKA_TOPIC, 0, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.toByteArray(), -1);
    private static final String KAFKA_MESSAGE_KEY2 = "cat";
    private static final SinkRecord SAMPLE_RECORD_2 = new SinkRecord(KAFKA_TOPIC, 0, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY2, Schema.BYTES_SCHEMA, KAFKA_MESSAGE2.toByteArray(), -1);
    private static final Message SAMPLE_MESSAGE_1 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setAttributes(ImmutableListMultimap.builder().put("x-goog-pubsublite-source-kafka-topic", ByteString.copyFromUtf8(KAFKA_TOPIC)).put("x-goog-pubsublite-source-kafka-partition", ByteString.copyFromUtf8(Integer.toString(0))).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8(Integer.toString(-1))).build()).build();
    private static final Message SAMPLE_MESSAGE_2 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY2)).setData(KAFKA_MESSAGE2).setAttributes(ImmutableListMultimap.builder().put("x-goog-pubsublite-source-kafka-topic", ByteString.copyFromUtf8(KAFKA_TOPIC)).put("x-goog-pubsublite-source-kafka-partition", ByteString.copyFromUtf8(Integer.toString(0))).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8(Integer.toString(-1))).build()).build();

    /* loaded from: input_file:com/google/pubsublite/kafka/sink/PubSubLiteSinkTaskTest$FakePublisher.class */
    static abstract class FakePublisher extends FakeApiService implements Publisher<MessageMetadata> {
        FakePublisher() {
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Assert.assertNotNull(this.publisher);
        this.task = new PubSubLiteSinkTask(map -> {
            return this.publisher;
        });
        this.task.start(ImmutableMap.of());
    }

    @After
    public void tearDown() {
        if (this.task != null) {
            this.task.stop();
        }
    }

    @Test
    public void testPutPrimitives() {
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int8(), (byte) 5, -1L);
        SinkRecord sinkRecord2 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int16(), (short) 5, -1L);
        SinkRecord sinkRecord3 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int32(), 5, -1L);
        SinkRecord sinkRecord4 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int64(), 5L, -1L);
        SinkRecord sinkRecord5 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.float32(), Float.valueOf(8.0f), -1L);
        SinkRecord sinkRecord6 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.float64(), Double.valueOf(8.0d), -1L);
        SinkRecord sinkRecord7 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.bool(), true, -1L);
        SinkRecord sinkRecord8 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.string(), "Test put.", -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        arrayList.add(sinkRecord2);
        arrayList.add(sinkRecord3);
        arrayList.add(sinkRecord4);
        arrayList.add(sinkRecord5);
        arrayList.add(sinkRecord6);
        arrayList.add(sinkRecord7);
        arrayList.add(sinkRecord8);
        this.task.put(arrayList);
        ((FakePublisher) Mockito.verify(this.publisher, Mockito.times(8))).publish((Message) ArgumentMatchers.any());
    }

    @Test
    public void testPutWherePublishesAreInvoked() {
        InOrder inOrder = Mockito.inOrder(new Object[]{this.publisher});
        this.task.put(ImmutableList.of(SAMPLE_RECORD_1, SAMPLE_RECORD_2));
        ((FakePublisher) inOrder.verify(this.publisher)).publish(SAMPLE_MESSAGE_1);
        ((FakePublisher) inOrder.verify(this.publisher)).publish(SAMPLE_MESSAGE_2);
    }

    @Test
    public void testPutWithNullValues() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, (Object) null, -1L));
        this.task.put(arrayList);
        ((FakePublisher) Mockito.verify(this.publisher)).publish(SAMPLE_MESSAGE_1.toBuilder().setData(ByteString.EMPTY).build());
    }

    @Test
    public void testPutWithNullMessage() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, Schema.STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, (Object) null, -1L));
        this.task.put(arrayList);
        ((FakePublisher) Mockito.verify(this.publisher)).publish(SAMPLE_MESSAGE_1.toBuilder().setKey(ByteString.EMPTY).setData(ByteString.EMPTY).build());
    }

    @Test
    public void testFlush() throws Exception {
        this.task.put(ImmutableList.of(SAMPLE_RECORD_1, SAMPLE_RECORD_2));
        this.task.flush(ImmutableMap.of());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.publisher});
        ((FakePublisher) inOrder.verify(this.publisher)).publish(SAMPLE_MESSAGE_1);
        ((FakePublisher) inOrder.verify(this.publisher)).publish(SAMPLE_MESSAGE_2);
        ((FakePublisher) inOrder.verify(this.publisher)).flush();
    }

    @Test
    public void testFlushBeforeStart() {
        this.task = new PubSubLiteSinkTask();
        this.task.flush(ImmutableMap.of());
        this.task = null;
    }

    @Test(expected = RuntimeException.class)
    public void testFlushExceptionCase() throws Exception {
        ((FakePublisher) Mockito.doThrow(new Throwable[]{new IOException("bad flush")}).when(this.publisher)).flush();
        this.task.put(ImmutableList.of(SAMPLE_RECORD_1));
        ((FakePublisher) Mockito.verify(this.publisher)).publish(SAMPLE_MESSAGE_1);
        try {
            this.task.flush(ImmutableMap.of());
        } finally {
            this.task = null;
        }
    }

    @Test
    public void testKafkaMetadata() {
        this.task.put(ImmutableList.of(new SinkRecord(KAFKA_TOPIC, 4, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.toByteArray(), 1000L, 50000L, TimestampType.CREATE_TIME), new SinkRecord(KAFKA_TOPIC, 4, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.asReadOnlyByteBuffer(), 1001L, 50001L, TimestampType.LOG_APPEND_TIME), new SinkRecord(KAFKA_TOPIC, 4, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.toByteArray(), 1002L, (Long) null, TimestampType.CREATE_TIME)));
        ImmutableListMultimap build = ImmutableListMultimap.builder().put("x-goog-pubsublite-source-kafka-topic", ByteString.copyFromUtf8(KAFKA_TOPIC)).put("x-goog-pubsublite-source-kafka-partition", ByteString.copyFromUtf8(Integer.toString(4))).build();
        Message build2 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setEventTime(Timestamps.fromMillis(50000L)).setAttributes(ImmutableListMultimap.builder().putAll(build).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8("1000")).put("x-goog-pubsublite-source-kafka-event-time-type", ByteString.copyFromUtf8("CreateTime")).build()).build();
        Message build3 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setEventTime(Timestamps.fromMillis(50001L)).setAttributes(ImmutableListMultimap.builder().putAll(build).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8("1001")).put("x-goog-pubsublite-source-kafka-event-time-type", ByteString.copyFromUtf8("LogAppendTime")).build()).build();
        Message build4 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setAttributes(ImmutableListMultimap.builder().putAll(build).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8("1002")).build()).build();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.publisher});
        ((FakePublisher) inOrder.verify(this.publisher)).publish(build2);
        ((FakePublisher) inOrder.verify(this.publisher)).publish(build3);
        ((FakePublisher) inOrder.verify(this.publisher)).publish(build4);
    }

    @Test
    public void testKafkaHeaders() {
        SinkRecord sinkRecord = new SinkRecord(KAFKA_TOPIC, 4, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.toByteArray(), 1000L, 50000L, TimestampType.CREATE_TIME);
        sinkRecord.headers().addString("myHeader", "myValue");
        SinkRecord sinkRecord2 = new SinkRecord(KAFKA_TOPIC, 4, Schema.STRING_SCHEMA, KAFKA_MESSAGE_KEY1, Schema.BYTES_SCHEMA, KAFKA_MESSAGE1.asReadOnlyByteBuffer(), 1001L, 50001L, TimestampType.LOG_APPEND_TIME);
        sinkRecord2.headers().addString("yourHeader", "yourValue");
        this.task.put(ImmutableList.of(sinkRecord, sinkRecord2));
        ImmutableListMultimap build = ImmutableListMultimap.builder().put("x-goog-pubsublite-source-kafka-topic", ByteString.copyFromUtf8(KAFKA_TOPIC)).put("x-goog-pubsublite-source-kafka-partition", ByteString.copyFromUtf8(Integer.toString(4))).build();
        Message build2 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setEventTime(Timestamps.fromMillis(50000L)).setAttributes(ImmutableListMultimap.builder().putAll(build).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8("1000")).put("x-goog-pubsublite-source-kafka-event-time-type", ByteString.copyFromUtf8("CreateTime")).put("myHeader", ByteString.copyFromUtf8("myValue")).build()).build();
        Message build3 = Message.builder().setKey(ByteString.copyFromUtf8(KAFKA_MESSAGE_KEY1)).setData(KAFKA_MESSAGE1).setEventTime(Timestamps.fromMillis(50001L)).setAttributes(ImmutableListMultimap.builder().putAll(build).put("x-goog-pubsublite-source-kafka-offset", ByteString.copyFromUtf8("1001")).put("x-goog-pubsublite-source-kafka-event-time-type", ByteString.copyFromUtf8("LogAppendTime")).put("yourHeader", ByteString.copyFromUtf8("yourValue")).build()).build();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.publisher});
        ((FakePublisher) inOrder.verify(this.publisher)).publish(build2);
        ((FakePublisher) inOrder.verify(this.publisher)).publish(build3);
    }
}
