package com.google.pubsub.kafka.sink;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.class */
public class CloudPubSubSinkTaskTest {
    private static final String CPS_TOPIC = "the";
    private static final String CPS_PROJECT = "quick";
    private static final String CPS_MIN_BATCH_SIZE1 = "2";
    private static final String CPS_MIN_BATCH_SIZE2 = "9";
    private static final String KAFKA_TOPIC = "brown";
    private static final String FIELD_STRING1 = "over";
    private static final String FIELD_STRING2 = "lazy";
    private static final String KAFKA_MESSAGE_KEY1 = "dog";
    private static final String KAFKA_MESSAGE_KEY2 = "cat";
    private CloudPubSubSinkTask task;
    private Map<String, String> props;
    private Publisher publisher;
    private static final ByteString KAFKA_MESSAGE1 = ByteString.copyFromUtf8("fox");
    private static final ByteString KAFKA_MESSAGE2 = ByteString.copyFromUtf8("jumps");
    private static final Schema STRING_SCHEMA = SchemaBuilder.string().build();
    private static final Schema BYTE_STRING_SCHEMA = SchemaBuilder.bytes().name(ConnectorUtils.SCHEMA_NAME).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest$SpyableFuture.class */
    public class SpyableFuture<V> implements ApiFuture<V> {
        private V value;
        private Throwable exception;

        public SpyableFuture(V v) {
            this.value = null;
            this.exception = null;
            this.value = v;
        }

        public <V> SpyableFuture(Throwable th) {
            this.value = null;
            this.exception = null;
            this.exception = th;
        }

        public V get() throws ExecutionException {
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return this.value;
        }

        public V get(long j, TimeUnit timeUnit) throws ExecutionException {
            return get();
        }

        public boolean cancel(boolean z) {
            return false;
        }

        public boolean isCancelled() {
            return false;
        }

        public boolean isDone() {
            return true;
        }

        public void addListener(Runnable runnable, Executor executor) {
            executor.execute(runnable);
        }
    }

    @Before
    public void setup() {
        this.publisher = (Publisher) Mockito.mock(Publisher.class, Mockito.RETURNS_DEEP_STUBS);
        this.task = new CloudPubSubSinkTask(this.publisher);
        this.props = new HashMap();
        this.props.put("cps.topic", CPS_TOPIC);
        this.props.put("cps.project", CPS_PROJECT);
        this.props.put("maxBufferSize", CPS_MIN_BATCH_SIZE2);
    }

    @Test
    public void testPutPrimitives() {
        this.task.start(this.props);
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int8(), (byte) 5, -1L);
        SinkRecord sinkRecord2 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int16(), (short) 5, -1L);
        SinkRecord sinkRecord3 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int32(), 5, -1L);
        SinkRecord sinkRecord4 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.int64(), 5L, -1L);
        SinkRecord sinkRecord5 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.float32(), Float.valueOf(8.0f), -1L);
        SinkRecord sinkRecord6 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.float64(), Double.valueOf(8.0d), -1L);
        SinkRecord sinkRecord7 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.bool(), true, -1L);
        SinkRecord sinkRecord8 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.string(), "Test put.", -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        arrayList.add(sinkRecord2);
        arrayList.add(sinkRecord3);
        arrayList.add(sinkRecord4);
        arrayList.add(sinkRecord5);
        arrayList.add(sinkRecord6);
        arrayList.add(sinkRecord7);
        arrayList.add(sinkRecord8);
        this.task.put(arrayList);
    }

    @Test
    public void testStructSchema() {
        this.task.start(this.props);
        Schema build = SchemaBuilder.struct().field(FIELD_STRING1, SchemaBuilder.string()).field(FIELD_STRING2, SchemaBuilder.string()).build();
        Struct struct = new Struct(build);
        struct.put(FIELD_STRING1, "tide");
        struct.put(FIELD_STRING2, "eagle");
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build, struct, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
    }

    @Test
    public void testStructSchemaWithOptionalField() {
        this.task.start(this.props);
        Schema build = SchemaBuilder.struct().field(FIELD_STRING1, SchemaBuilder.string()).field(FIELD_STRING2, SchemaBuilder.string().optional()).build();
        Struct struct = new Struct(build);
        struct.put(FIELD_STRING1, "tide");
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build, struct, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
        struct.put(FIELD_STRING2, "eagle");
        SinkRecord sinkRecord2 = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build, struct, -1L);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(sinkRecord2);
        this.task.put(arrayList2);
    }

    @Test(expected = DataException.class)
    public void testStructSchemaWithMissingField() {
        this.task.start(this.props);
        Schema build = SchemaBuilder.struct().field(FIELD_STRING1, SchemaBuilder.string()).field(FIELD_STRING2, SchemaBuilder.string()).build();
        Struct struct = new Struct(build);
        struct.put(FIELD_STRING1, "tide");
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build, struct, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
    }

    @Test(expected = DataException.class)
    public void testStructSchemaWithNestedSchema() {
        this.task.start(this.props);
        Schema build = SchemaBuilder.struct().build();
        Struct struct = new Struct(build);
        Schema build2 = SchemaBuilder.struct().field(FIELD_STRING1, SchemaBuilder.string()).field(FIELD_STRING2, build).build();
        Struct struct2 = new Struct(build2);
        struct2.put(FIELD_STRING1, "tide");
        struct2.put(FIELD_STRING2, struct);
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build2, struct2, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
    }

    @Test
    public void testMapSchema() {
        this.task.start(this.props);
        Schema build = SchemaBuilder.map(SchemaBuilder.string(), SchemaBuilder.string()).build();
        HashMap hashMap = new HashMap();
        hashMap.put(FIELD_STRING1, "tide");
        hashMap.put(FIELD_STRING2, "eagle");
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, build, hashMap, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
        arrayList.add(new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.map(SchemaBuilder.string(), SchemaBuilder.bytes()).build(), hashMap, -1L));
        try {
            this.task.put(arrayList);
        } catch (DataException e) {
        }
    }

    @Test
    public void testArraySchema() {
        this.task.start(this.props);
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.array(SchemaBuilder.string()).build(), new String[]{"Roll", "tide"}, -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
        arrayList.add(new SinkRecord((String) null, -1, (Schema) null, (Object) null, SchemaBuilder.array(SchemaBuilder.struct()).build(), (Object) null, -1L));
        try {
            this.task.put(arrayList);
        } catch (DataException e) {
        }
    }

    @Test
    public void testNullSchema() {
        this.task.start(this.props);
        SinkRecord sinkRecord = new SinkRecord((String) null, -1, (Schema) null, (Object) null, (Schema) null, "I have no schema", -1L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sinkRecord);
        this.task.put(arrayList);
    }

    @Test
    public void testPutWherePublishesAreInvoked() {
        this.props.put("maxBufferSize", CPS_MIN_BATCH_SIZE1);
        this.task.start(this.props);
        this.task.put(getSampleRecords());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) forClass.capture());
        Assert.assertEquals(forClass.getAllValues(), getPubsubMessagesFromSampleRecords());
    }

    @Test
    public void testPutWithNullValues() {
        this.props.put("maxBufferSize", CPS_MIN_BATCH_SIZE1);
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, STRING_SCHEMA, (Object) null, -1L));
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(1))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).build());
        Assert.assertEquals(allValues, arrayList2);
    }

    @Test
    public void testPutWithNullMessage() {
        System.out.println("NULL MSG");
        this.props.put("maxBufferSize", CPS_MIN_BATCH_SIZE1);
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, STRING_SCHEMA, (Object) null, STRING_SCHEMA, (Object) null, -1L));
        this.task.put(arrayList);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(0))).publish((PubsubMessage) ArgumentCaptor.forClass(PubsubMessage.class).capture());
    }

    @Test
    public void testFlushWithNoPublishInPut() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(KAFKA_TOPIC, 0), null);
        List<SinkRecord> sampleRecords = getSampleRecords();
        ApiFuture<String> successfulPublishFuture = getSuccessfulPublishFuture();
        Mockito.when(Boolean.valueOf(successfulPublishFuture.isDone())).thenReturn(false);
        Mockito.when(this.publisher.publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class))).thenReturn(successfulPublishFuture);
        this.task.put(sampleRecords);
        this.task.flush(hashMap);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class));
        ((ApiFuture) Mockito.verify(successfulPublishFuture, Mockito.times(2))).addListener((Runnable) ArgumentMatchers.any(Runnable.class), (Executor) ArgumentMatchers.any(Executor.class));
    }

    @Test(expected = RuntimeException.class)
    public void testFlushExceptionCase() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(KAFKA_TOPIC, 0), null);
        List<SinkRecord> sampleRecords = getSampleRecords();
        ApiFuture<String> failedPublishFuture = getFailedPublishFuture();
        Mockito.when(this.publisher.publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class))).thenReturn(failedPublishFuture);
        this.task.put(sampleRecords);
        this.task.flush(hashMap);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(1))).publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class));
        ((ApiFuture) Mockito.verify(failedPublishFuture, Mockito.times(1))).addListener((Runnable) ArgumentMatchers.any(Runnable.class), (Executor) ArgumentMatchers.any(Executor.class));
    }

    @Test
    public void testKafkaMetadata() {
        this.props.put("metadata.publish", "true");
        this.props.put("maxBufferSize", CPS_MIN_BATCH_SIZE1);
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, 1000L, 50000L, TimestampType.CREATE_TIME));
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1001L, 50001L, TimestampType.CREATE_TIME));
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1002L, (Long) null, TimestampType.CREATE_TIME));
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(3))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        hashMap.put("kafka.topic", KAFKA_TOPIC);
        hashMap.put("kafka.partition", "4");
        hashMap.put("kafka.offset", "1000");
        hashMap.put("kafka.timestamp", "50000");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setData(KAFKA_MESSAGE1).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", KAFKA_MESSAGE_KEY1);
        hashMap2.put("kafka.topic", KAFKA_TOPIC);
        hashMap2.put("kafka.partition", "4");
        hashMap2.put("kafka.offset", "1001");
        hashMap2.put("kafka.timestamp", "50001");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap2).setData(KAFKA_MESSAGE2).build());
        HashMap hashMap3 = new HashMap();
        hashMap3.put("key", KAFKA_MESSAGE_KEY1);
        hashMap3.put("kafka.topic", KAFKA_TOPIC);
        hashMap3.put("kafka.partition", "4");
        hashMap3.put("kafka.offset", "1002");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap3).setData(KAFKA_MESSAGE2).build());
        Assert.assertEquals(allValues, arrayList2);
    }

    @Test
    public void testKafkaHeaders() {
        this.props.put("headers.publish", "true");
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        SinkRecord sinkRecord = new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, 1000L, 50000L, TimestampType.CREATE_TIME);
        sinkRecord.headers().addString("myHeader", "myValue");
        arrayList.add(sinkRecord);
        SinkRecord sinkRecord2 = new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1001L, 50001L, TimestampType.CREATE_TIME);
        sinkRecord2.headers().addString("yourHeader", "yourValue");
        arrayList.add(sinkRecord2);
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        hashMap.put("myHeader", "myValue");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setData(KAFKA_MESSAGE1).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", KAFKA_MESSAGE_KEY1);
        hashMap2.put("yourHeader", "yourValue");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap2).setData(KAFKA_MESSAGE2).build());
        Assert.assertEquals(arrayList2, allValues);
    }

    @Test
    public void testUnsupportedKafkaHeaders() {
        this.props.put("headers.publish", "true");
        this.task.start(this.props);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 257; i++) {
            sb.append("-");
        }
        String sb2 = sb.toString();
        sb.setLength(0);
        for (int i2 = 0; i2 < 1025; i2++) {
            sb.append(".");
        }
        String sb3 = sb.toString();
        sb.setLength(0);
        ArrayList arrayList = new ArrayList();
        SinkRecord sinkRecord = new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, 1000L, 50000L, TimestampType.CREATE_TIME);
        sinkRecord.headers().addString("myHeader", "myValue");
        sinkRecord.headers().addString(sb2, "anotherValue");
        sinkRecord.headers().addString("anotherHeader", sb3);
        sinkRecord.headers().addString(sb2, sb3);
        arrayList.add(sinkRecord);
        SinkRecord sinkRecord2 = new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1001L, 50001L, TimestampType.CREATE_TIME);
        sinkRecord2.headers().addString("yourHeader", "yourValue");
        arrayList.add(sinkRecord2);
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        hashMap.put("myHeader", "myValue");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setData(KAFKA_MESSAGE1).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", KAFKA_MESSAGE_KEY1);
        hashMap2.put("yourHeader", "yourValue");
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap2).setData(KAFKA_MESSAGE2).build());
        Assert.assertEquals(257L, sb2.getBytes().length);
        Assert.assertEquals(1025L, sb3.getBytes().length);
        Assert.assertEquals(arrayList2, allValues);
    }

    @Test
    public void testFlushExceptionThenNoExceptionCase() throws Exception {
        this.task.start(this.props);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(KAFKA_TOPIC, 0), null);
        List<SinkRecord> sampleRecords = getSampleRecords();
        ApiFuture<String> failedPublishFuture = getFailedPublishFuture();
        Mockito.when(Boolean.valueOf(failedPublishFuture.isDone())).thenReturn(false);
        ApiFuture<String> successfulPublishFuture = getSuccessfulPublishFuture();
        Mockito.when(Boolean.valueOf(successfulPublishFuture.isDone())).thenReturn(false);
        Mockito.when(this.publisher.publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class))).thenReturn(failedPublishFuture).thenReturn(failedPublishFuture).thenReturn(successfulPublishFuture);
        this.task.put(sampleRecords);
        try {
            this.task.flush(hashMap);
        } catch (RuntimeException e) {
        }
        this.task.put(getSampleRecords());
        this.task.flush(hashMap);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(4))).publish((PubsubMessage) ArgumentMatchers.any(PubsubMessage.class));
        ((ApiFuture) Mockito.verify(failedPublishFuture, Mockito.times(2))).addListener((Runnable) ArgumentMatchers.any(Runnable.class), (Executor) ArgumentMatchers.any(Executor.class));
        ((ApiFuture) Mockito.verify(successfulPublishFuture, Mockito.times(2))).addListener((Runnable) ArgumentMatchers.any(Runnable.class), (Executor) ArgumentMatchers.any(Executor.class));
    }

    @Test
    public void testPublisherShutdownOnStop() throws Exception {
        this.props.put("maxShutdownTimeoutMs", Integer.toString(20000));
        this.task.start(this.props);
        this.task.stop();
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(1))).shutdown();
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(1))).awaitTermination(20000, TimeUnit.MILLISECONDS);
    }

    private List<SinkRecord> getSampleRecords() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, -1L));
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 0, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, -1L));
        return arrayList;
    }

    @Test
    public void testOrderingKeysKey() {
        this.props.put("orderingKeySource", "key");
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, 1000L, 50000L, TimestampType.CREATE_TIME));
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY2, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1001L, 50001L, TimestampType.CREATE_TIME));
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setOrderingKey(KAFKA_MESSAGE_KEY1).setData(KAFKA_MESSAGE1).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", KAFKA_MESSAGE_KEY2);
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap2).setOrderingKey(KAFKA_MESSAGE_KEY2).setData(KAFKA_MESSAGE2).build());
        Assert.assertEquals(arrayList2, allValues);
    }

    @Test
    public void testOrderingKeysPartition() {
        this.props.put("orderingKeySource", "partition");
        this.task.start(this.props);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 4, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, KAFKA_MESSAGE1, 1000L, 50000L, TimestampType.CREATE_TIME));
        arrayList.add(new SinkRecord(KAFKA_TOPIC, 5, STRING_SCHEMA, KAFKA_MESSAGE_KEY2, BYTE_STRING_SCHEMA, KAFKA_MESSAGE2, 1001L, 50001L, TimestampType.CREATE_TIME));
        this.task.put(arrayList);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PubsubMessage.class);
        ((Publisher) Mockito.verify(this.publisher, Mockito.times(2))).publish((PubsubMessage) forClass.capture());
        List allValues = forClass.getAllValues();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setOrderingKey("4").setData(KAFKA_MESSAGE1).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key", KAFKA_MESSAGE_KEY2);
        arrayList2.add(PubsubMessage.newBuilder().putAllAttributes(hashMap2).setOrderingKey("5").setData(KAFKA_MESSAGE2).build());
        Assert.assertEquals(arrayList2, allValues);
    }

    private List<PubsubMessage> getPubsubMessagesFromSampleRecords() {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("key", KAFKA_MESSAGE_KEY1);
        arrayList.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setData(KAFKA_MESSAGE1).build());
        arrayList.add(PubsubMessage.newBuilder().putAllAttributes(hashMap).setData(KAFKA_MESSAGE2).build());
        return arrayList;
    }

    private ApiFuture<String> getSuccessfulPublishFuture() {
        return (ApiFuture) Mockito.spy(new SpyableFuture("abcd"));
    }

    private ApiFuture<String> getFailedPublishFuture() {
        return (ApiFuture) Mockito.spy(new SpyableFuture((Throwable) new Exception()));
    }
}
