package com.google.pubsub.kafka.source;

import com.google.api.core.SettableApiFuture;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.pubsub.kafka.source.CloudPubSubSourceConnector;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/pubsub/kafka/source/CloudPubSubSourceTaskTest.class */
public class CloudPubSubSourceTaskTest {
    private static final String CPS_MAX_BATCH_SIZE = "1000";
    private static final String CPS_SUBSCRIPTION = "quick";
    private static final String KAFKA_TOPIC = "brown";
    private static final String KAFKA_MESSAGE_KEY_ATTRIBUTE = "fox";
    private static final String KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE = "jumped";
    private static final String KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE = "over";
    private static final String KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE = "1234567890";
    private static final String KAFKA_PARTITIONS = "3";
    private static final String ACK_ID1 = "ackID1";
    private static final String ACK_ID2 = "ackID2";
    private static final String ACK_ID3 = "ackID3";
    private static final String ACK_ID4 = "ackID4";
    private CloudPubSubSourceTask task;
    private Map<String, String> props;
    private CloudPubSubSubscriber subscriber;
    private static final Logger log = LoggerFactory.getLogger(CloudPubSubSourceTaskTest.class);
    private static final String CPS_PROJECT = "the";
    private static final ByteString CPS_MESSAGE = ByteString.copyFromUtf8(CPS_PROJECT);
    private static final byte[] KAFKA_VALUE = CPS_MESSAGE.toByteArray();

    public void assertRecordsEqual(SourceRecord sourceRecord, SourceRecord sourceRecord2) {
        Assert.assertEquals(sourceRecord.key(), sourceRecord2.key());
        Assert.assertEquals(sourceRecord.keySchema(), sourceRecord2.keySchema());
        Assert.assertEquals(sourceRecord.valueSchema(), sourceRecord2.valueSchema());
        Assert.assertEquals(sourceRecord.topic(), sourceRecord2.topic());
        if (sourceRecord.valueSchema() == Schema.BYTES_SCHEMA) {
            Assert.assertArrayEquals((byte[]) sourceRecord.value(), (byte[]) sourceRecord2.value());
            return;
        }
        for (Field field : sourceRecord.valueSchema().fields()) {
            if (field.name().equals("message")) {
                Assert.assertArrayEquals(((Struct) sourceRecord.value()).getBytes(field.name()), ((Struct) sourceRecord2.value()).getBytes(field.name()));
            } else {
                Assert.assertEquals(((Struct) sourceRecord.value()).getString(field.name()), ((Struct) sourceRecord2.value()).getString(field.name()));
            }
        }
    }

    @Before
    public void setup() {
        this.subscriber = (CloudPubSubSubscriber) Mockito.mock(CloudPubSubSubscriber.class, Mockito.RETURNS_DEEP_STUBS);
        this.task = new CloudPubSubSourceTask(this.subscriber);
        this.props = new HashMap();
        this.props.put("cps.project", CPS_PROJECT);
        this.props.put("cps.maxBatchSize", CPS_MAX_BATCH_SIZE);
        this.props.put("cps.subscription", CPS_SUBSCRIPTION);
        this.props.put("kafka.topic", KAFKA_TOPIC);
        this.props.put("kafka.key.attribute", KAFKA_MESSAGE_KEY_ATTRIBUTE);
        this.props.put("kafka.timestamp.attribute", KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE);
        this.props.put("kafka.partition.count", KAFKA_PARTITIONS);
        this.props.put("kafka.partition.scheme", CloudPubSubSourceConnector.PartitionScheme.ROUND_ROBIN.toString());
    }

    @Test
    public void testPollCaseWithNoMessages() throws Exception {
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of());
        Assert.assertEquals(0L, this.task.poll().size());
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
    }

    @Test
    public void testPollInRegularCase() throws Exception {
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        Assert.assertEquals(1L, poll.size());
        this.task.commitRecord((SourceRecord) poll.get(0));
        SettableApiFuture create = SettableApiFuture.create();
        create.set(Empty.getDefaultInstance());
        Mockito.when(this.subscriber.ackMessages((Collection) ArgumentMatchers.any())).thenReturn(create);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of());
        Assert.assertEquals(0L, this.task.poll().size());
        Assert.assertEquals(0L, this.task.poll().size());
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.times(1))).ackMessages((Collection) ArgumentMatchers.any());
    }

    @Test
    public void testPollWithNoMessageKeyAttribute() throws Exception {
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithMessageKeyAttribute() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, hashMap, null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE, Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithMessageTimestampAttribute() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
        hashMap.put(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE, KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, hashMap, null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE, Schema.BYTES_SCHEMA, KAFKA_VALUE, Long.valueOf(Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE))), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithMultipleAttributesAndRecordHeaders() throws Exception {
        this.props.put("kafka.record.headers", "true");
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
        hashMap.put("attribute1", "attribute_value1");
        hashMap.put("attribute2", "attribute_value2");
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, hashMap, null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        ConnectHeaders connectHeaders = new ConnectHeaders();
        connectHeaders.addString("attribute1", "attribute_value1");
        connectHeaders.addString("attribute2", "attribute_value2");
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE, Schema.BYTES_SCHEMA, KAFKA_VALUE, Long.valueOf(Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE)), connectHeaders), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithOrderingKeyAsAttribute() throws Exception {
        this.props.put("cps.makeOrderingKeyAttribute", "true");
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), "my-key")));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        Schema build = SchemaBuilder.struct().field("message", Schema.BYTES_SCHEMA).field("orderingKey", Schema.STRING_SCHEMA).build();
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, build, new Struct(build).put("message", KAFKA_VALUE).put("orderingKey", "my-key")), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithOrderingKeyAsRecordHeader() throws Exception {
        this.props.put("kafka.record.headers", "true");
        this.props.put("cps.makeOrderingKeyAttribute", "true");
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), "my-key")));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        ConnectHeaders connectHeaders = new ConnectHeaders();
        connectHeaders.addString("orderingKey", "my-key3");
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE, Long.valueOf(Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE)), connectHeaders), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithMultipleAttributes() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
        hashMap.put("attribute1", "attribute_value1");
        hashMap.put("attribute2", "attribute_value2");
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, hashMap, null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        Schema build = SchemaBuilder.struct().field("message", Schema.BYTES_SCHEMA).field("attribute1", Schema.STRING_SCHEMA).field("attribute2", Schema.STRING_SCHEMA).build();
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE, build, new Struct(build).put("message", KAFKA_VALUE).put("attribute1", "attribute_value1").put("attribute2", "attribute_value2")), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithPartitionSchemeHashKey() throws Exception {
        this.props.put("kafka.partition.scheme", CloudPubSubSourceConnector.PartitionScheme.HASH_KEY.toString());
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID2, CPS_MESSAGE, hashMap, null), createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(2L, poll.size());
        SourceRecord sourceRecord = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, Integer.valueOf(KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS)), Schema.OPTIONAL_STRING_SCHEMA, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        SourceRecord sourceRecord2 = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        assertRecordsEqual(sourceRecord, (SourceRecord) poll.get(0));
        Assert.assertArrayEquals((byte[]) sourceRecord2.value(), (byte[]) ((SourceRecord) poll.get(1)).value());
    }

    @Test
    public void testPollWithPartitionSchemeHashValue() throws Exception {
        this.props.put("kafka.partition.scheme", CloudPubSubSourceConnector.PartitionScheme.HASH_VALUE.toString());
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, Integer.valueOf(KAFKA_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS)), Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithPartitionSchemeKafkaPartitioner() throws Exception {
        this.props.put("kafka.partition.scheme", CloudPubSubSourceConnector.PartitionScheme.KAFKA_PARTITIONER.toString());
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, (Integer) null, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
        Assert.assertNull(((SourceRecord) poll.get(0)).kafkaPartition());
    }

    @Test
    public void testPollWithPartitionSchemaOrderingKey() throws Exception {
        this.props.put("kafka.partition.scheme", CloudPubSubSourceConnector.PartitionScheme.ORDERING_KEY.toString());
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), "my-key")));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, Integer.valueOf("my-key".hashCode() % Integer.parseInt(KAFKA_PARTITIONS)), Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollWithPartitionSchemeRoundRobin() throws Exception {
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), null), createReceivedMessage(ACK_ID2, CPS_MESSAGE, new HashMap(), null), createReceivedMessage(ACK_ID3, CPS_MESSAGE, new HashMap(), null), createReceivedMessage(ACK_ID4, CPS_MESSAGE, new HashMap(), null)));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(4L, poll.size());
        SourceRecord sourceRecord = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        SourceRecord sourceRecord2 = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 1, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        SourceRecord sourceRecord3 = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 2, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        SourceRecord sourceRecord4 = new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, (Object) null, Schema.BYTES_SCHEMA, KAFKA_VALUE);
        assertRecordsEqual(sourceRecord, (SourceRecord) poll.get(0));
        assertRecordsEqual(sourceRecord2, (SourceRecord) poll.get(1));
        assertRecordsEqual(sourceRecord3, (SourceRecord) poll.get(2));
        assertRecordsEqual(sourceRecord4, (SourceRecord) poll.get(3));
    }

    @Test
    public void testSetOrderingKeyAsKey() throws Exception {
        this.props.put("kafka.key.attribute", "orderingKey");
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenReturn(ImmutableList.of(createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap(), "my-key")));
        List poll = this.task.poll();
        ((CloudPubSubSubscriber) Mockito.verify(this.subscriber, Mockito.never())).ackMessages((Collection) ArgumentMatchers.any());
        Assert.assertEquals(1L, poll.size());
        assertRecordsEqual(new SourceRecord((Map) null, (Map) null, KAFKA_TOPIC, 0, Schema.OPTIONAL_STRING_SCHEMA, "my-key", Schema.BYTES_SCHEMA, KAFKA_VALUE), (SourceRecord) poll.get(0));
    }

    @Test
    public void testPollExceptionCase() throws Exception {
        this.task.start(this.props);
        Mockito.when(this.subscriber.pull().get()).thenThrow(new Throwable[]{new InterruptedException()});
        Assert.assertEquals(0L, this.task.poll().size());
    }

    private ReceivedMessage createReceivedMessage(String str, ByteString byteString, Map<String, String> map, String str2) {
        PubsubMessage.Builder putAllAttributes = PubsubMessage.newBuilder().setData(byteString).putAllAttributes(map);
        if (str2 != null) {
            putAllAttributes.setOrderingKey(str2);
        }
        return ReceivedMessage.newBuilder().setAckId(str).setMessage(putAllAttributes.build()).build();
    }
}
