package com.google.pubsub.kafka.sink;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTask.class */
public class CloudPubSubSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CloudPubSubSinkTask.class);
    private Map<String, Map<Integer, OutstandingFuturesForPartition>> allOutstandingFutures = new HashMap();
    private String cpsProject;
    private String cpsTopic;
    private String cpsEndpoint;
    private String messageBodyName;
    private long maxBufferSize;
    private long maxBufferBytes;
    private long maxOutstandingRequestBytes;
    private long maxOutstandingMessages;
    private int maxDelayThresholdMs;
    private int maxRequestTimeoutMs;
    private int maxTotalTimeoutMs;
    private int maxShutdownTimeoutMs;
    private boolean includeMetadata;
    private boolean includeHeaders;
    private CloudPubSubSinkConnector.OrderingKeySource orderingKeySource;
    private ConnectorCredentialsProvider gcpCredentialsProvider;
    private Publisher publisher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.pubsub.kafka.sink.CloudPubSubSinkTask$1, reason: invalid class name */
    /* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$connect$data$Schema$Type = new int[Schema.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.INT8.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.INT16.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.INT32.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.INT64.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.FLOAT32.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.FLOAT64.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.BOOLEAN.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.STRING.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.BYTES.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.STRUCT.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.MAP.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$data$Schema$Type[Schema.Type.ARRAY.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTask$OutstandingFuturesForPartition.class */
    public class OutstandingFuturesForPartition {
        public List<ApiFuture<String>> futures;

        private OutstandingFuturesForPartition() {
            this.futures = new ArrayList();
        }

        /* synthetic */ OutstandingFuturesForPartition(CloudPubSubSinkTask cloudPubSubSinkTask, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:com/google/pubsub/kafka/sink/CloudPubSubSinkTask$UnpublishedMessagesForPartition.class */
    private class UnpublishedMessagesForPartition {
        public List<PubsubMessage> messages = new ArrayList();
        public int size = 0;

        private UnpublishedMessagesForPartition() {
        }
    }

    public CloudPubSubSinkTask() {
    }

    @VisibleForTesting
    public CloudPubSubSinkTask(Publisher publisher) {
        this.publisher = publisher;
    }

    public String version() {
        return new CloudPubSubSinkConnector().version();
    }

    public void start(Map<String, String> map) {
        Map<String, Object> parse = new CloudPubSubSinkConnector().config().parse(map);
        this.cpsProject = parse.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
        this.cpsTopic = parse.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
        this.cpsEndpoint = parse.get(ConnectorUtils.CPS_ENDPOINT).toString();
        this.maxBufferSize = ((Integer) parse.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG)).intValue();
        this.maxBufferBytes = ((Long) parse.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG)).longValue();
        this.maxOutstandingRequestBytes = ((Long) parse.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_REQUEST_BYTES)).longValue();
        this.maxOutstandingMessages = ((Long) parse.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_MESSAGES)).longValue();
        this.maxDelayThresholdMs = ((Integer) parse.get(CloudPubSubSinkConnector.MAX_DELAY_THRESHOLD_MS)).intValue();
        this.maxRequestTimeoutMs = ((Integer) parse.get(CloudPubSubSinkConnector.MAX_REQUEST_TIMEOUT_MS)).intValue();
        this.maxTotalTimeoutMs = ((Integer) parse.get(CloudPubSubSinkConnector.MAX_TOTAL_TIMEOUT_MS)).intValue();
        this.maxShutdownTimeoutMs = ((Integer) parse.get(CloudPubSubSinkConnector.MAX_SHUTDOWN_TIMEOUT_MS)).intValue();
        this.messageBodyName = (String) parse.get(CloudPubSubSinkConnector.CPS_MESSAGE_BODY_NAME);
        this.includeMetadata = ((Boolean) parse.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_METADATA)).booleanValue();
        this.includeHeaders = ((Boolean) parse.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS)).booleanValue();
        this.orderingKeySource = CloudPubSubSinkConnector.OrderingKeySource.getEnum((String) parse.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
        this.gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(parse);
        if (this.publisher == null) {
            createPublisher();
        }
        log.info("Start CloudPubSubSinkTask");
    }

    public void put(Collection<SinkRecord> collection) {
        log.debug("Received " + collection.size() + " messages to send to CPS.");
        for (SinkRecord sinkRecord : collection) {
            log.trace("Received record: " + sinkRecord.toString());
            Map<String, String> hashMap = new HashMap<>();
            ByteString handleValue = handleValue(sinkRecord.valueSchema(), sinkRecord.value(), hashMap);
            String str = null;
            String num = sinkRecord.kafkaPartition().toString();
            if (sinkRecord.key() != null) {
                str = sinkRecord.key().toString();
                hashMap.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, str);
            }
            if (this.includeMetadata) {
                hashMap.put("kafka.topic", sinkRecord.topic());
                hashMap.put(ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, num);
                hashMap.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(sinkRecord.kafkaOffset()));
                if (sinkRecord.timestamp() != null) {
                    hashMap.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, sinkRecord.timestamp().toString());
                }
            }
            if (this.includeHeaders) {
                for (Header header : getRecordHeaders(sinkRecord)) {
                    hashMap.put(header.key(), header.value().toString());
                }
            }
            if (hashMap.size() == 0 && handleValue == null) {
                log.warn("Message received with no value and no attributes. Not publishing message");
                SettableApiFuture create = SettableApiFuture.create();
                create.set("No message");
                addPendingMessageFuture(sinkRecord.topic(), sinkRecord.kafkaPartition(), create);
            } else {
                PubsubMessage.Builder newBuilder = PubsubMessage.newBuilder();
                newBuilder.putAllAttributes(hashMap);
                if (handleValue != null) {
                    newBuilder.setData(handleValue);
                }
                if (this.orderingKeySource == CloudPubSubSinkConnector.OrderingKeySource.KEY && str != null && !str.isEmpty()) {
                    newBuilder.setOrderingKey(str);
                } else if (this.orderingKeySource == CloudPubSubSinkConnector.OrderingKeySource.PARTITION) {
                    newBuilder.setOrderingKey(num);
                }
                publishMessage(sinkRecord.topic(), sinkRecord.kafkaPartition(), newBuilder.build());
            }
        }
    }

    private Iterable<? extends Header> getRecordHeaders(SinkRecord sinkRecord) {
        ConnectHeaders connectHeaders = new ConnectHeaders();
        if (sinkRecord.headers() != null) {
            int i = 0;
            for (Header header : sinkRecord.headers()) {
                if (header.key().getBytes().length < 257 && String.valueOf(header.value()).getBytes().length < 1025) {
                    connectHeaders.add(header);
                    i++;
                }
                if (i > 100) {
                    break;
                }
            }
        }
        return connectHeaders;
    }

    private ByteString handleValue(Schema schema, Object obj, Map<String, String> map) {
        if (obj == null) {
            return null;
        }
        if (schema == null) {
            return ByteString.copyFromUtf8(obj.toString());
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$connect$data$Schema$Type[schema.type().ordinal()]) {
            case 1:
                return ByteString.copyFrom(new byte[]{((Byte) obj).byteValue()});
            case 2:
                ByteBuffer allocate = ByteBuffer.allocate(2);
                allocate.putShort(((Short) obj).shortValue());
                return ByteString.copyFrom(allocate);
            case 3:
                ByteBuffer allocate2 = ByteBuffer.allocate(4);
                allocate2.putInt(((Integer) obj).intValue());
                return ByteString.copyFrom(allocate2);
            case 4:
                ByteBuffer allocate3 = ByteBuffer.allocate(8);
                allocate3.putLong(((Long) obj).longValue());
                return ByteString.copyFrom(allocate3);
            case 5:
                ByteBuffer allocate4 = ByteBuffer.allocate(4);
                allocate4.putFloat(((Float) obj).floatValue());
                return ByteString.copyFrom(allocate4);
            case 6:
                ByteBuffer allocate5 = ByteBuffer.allocate(8);
                allocate5.putDouble(((Double) obj).doubleValue());
                return ByteString.copyFrom(allocate5);
            case 7:
                return ByteString.copyFrom(new byte[]{(byte) (((Boolean) obj).booleanValue() ? 1 : 0)});
            case 8:
                return ByteString.copyFromUtf8((String) obj);
            case 9:
                if (obj instanceof ByteString) {
                    return (ByteString) obj;
                }
                if (obj instanceof byte[]) {
                    return ByteString.copyFrom((byte[]) obj);
                }
                if (obj instanceof ByteBuffer) {
                    return ByteString.copyFrom((ByteBuffer) obj);
                }
                throw new DataException("Unexpected value class with BYTES schema type.");
            case 10:
                Struct struct = (Struct) obj;
                ByteString byteString = null;
                for (Field field : schema.fields()) {
                    Schema.Type type = field.schema().type();
                    if (type == Schema.Type.MAP || type == Schema.Type.STRUCT) {
                        throw new DataException("Struct type does not support nested Map or Struct types, present in field " + field.name());
                    }
                    Object obj2 = struct.get(field);
                    if (obj2 == null) {
                        if (!field.schema().isOptional()) {
                            throw new DataException("Struct message missing required field " + field.name());
                        }
                    } else if (field.name().equals(this.messageBodyName)) {
                        byteString = handleValue(field.schema(), obj2, null);
                    } else {
                        map.put(field.name(), obj2.toString());
                    }
                }
                return byteString != null ? byteString : ByteString.EMPTY;
            case 11:
                Map map2 = (Map) obj;
                ByteString byteString2 = null;
                for (Object obj3 : map2.keySet()) {
                    if (obj3.equals(this.messageBodyName)) {
                        byteString2 = ByteString.copyFromUtf8(map2.get(obj3).toString());
                    } else {
                        map.put(obj3.toString(), map2.get(obj3).toString());
                    }
                }
                return byteString2 != null ? byteString2 : ByteString.EMPTY;
            case 12:
                Schema.Type type2 = schema.valueSchema().type();
                if (type2 == Schema.Type.MAP || type2 == Schema.Type.STRUCT) {
                    throw new DataException("Array type does not support Map or Struct types.");
                }
                ByteString byteString3 = ByteString.EMPTY;
                for (Object obj4 : (Object[]) obj) {
                    byteString3 = byteString3.concat(handleValue(schema.valueSchema(), obj4, null));
                }
                return byteString3;
            default:
                return ByteString.EMPTY;
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        OutstandingFuturesForPartition outstandingFuturesForPartition;
        log.debug("Flushing...");
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            log.trace("Received flush for partition " + entry.getKey().toString());
            Map<Integer, OutstandingFuturesForPartition> map2 = this.allOutstandingFutures.get(entry.getKey().topic());
            if (map2 != null && (outstandingFuturesForPartition = map2.get(Integer.valueOf(entry.getKey().partition()))) != null) {
                try {
                    try {
                        ApiFutures.allAsList(outstandingFuturesForPartition.futures).get();
                        outstandingFuturesForPartition.futures.clear();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th) {
                    outstandingFuturesForPartition.futures.clear();
                    throw th;
                }
            }
        }
        this.allOutstandingFutures.clear();
    }

    private void publishMessage(String str, Integer num, PubsubMessage pubsubMessage) {
        addPendingMessageFuture(str, num, this.publisher.publish(pubsubMessage));
    }

    private void addPendingMessageFuture(String str, Integer num, ApiFuture<String> apiFuture) {
        Map<Integer, OutstandingFuturesForPartition> map = this.allOutstandingFutures.get(str);
        if (map == null) {
            map = new HashMap();
            this.allOutstandingFutures.put(str, map);
        }
        OutstandingFuturesForPartition outstandingFuturesForPartition = map.get(num);
        if (outstandingFuturesForPartition == null) {
            outstandingFuturesForPartition = new OutstandingFuturesForPartition(this, null);
            map.put(num, outstandingFuturesForPartition);
        }
        outstandingFuturesForPartition.futures.add(apiFuture);
    }

    private void createPublisher() {
        ProjectTopicName of = ProjectTopicName.of(this.cpsProject, this.cpsTopic);
        BatchingSettings.Builder requestByteThreshold = BatchingSettings.newBuilder().setDelayThreshold(Duration.ofMillis(this.maxDelayThresholdMs)).setElementCountThreshold(Long.valueOf(this.maxBufferSize)).setRequestByteThreshold(Long.valueOf(this.maxBufferBytes));
        if (useFlowControl()) {
            requestByteThreshold.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(Long.valueOf(this.maxOutstandingRequestBytes)).setMaxOutstandingElementCount(Long.valueOf(this.maxOutstandingMessages)).setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block).build());
        }
        Publisher.Builder endpoint = Publisher.newBuilder(of).setCredentialsProvider(this.gcpCredentialsProvider).setBatchingSettings(requestByteThreshold.build()).setRetrySettings(RetrySettings.newBuilder().setTotalTimeout(Duration.ofMillis(this.maxTotalTimeoutMs)).setMaxRpcTimeout(Duration.ofMillis(this.maxRequestTimeoutMs)).setInitialRetryDelay(Duration.ofMillis(5L)).setRetryDelayMultiplier(2.0d).setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE)).setInitialRpcTimeout(Duration.ofSeconds(10L)).setRpcTimeoutMultiplier(2.0d).build()).setExecutorProvider(FixedExecutorProvider.create(ConnectorUtils.getSystemExecutor())).setEndpoint(this.cpsEndpoint);
        if (this.orderingKeySource != CloudPubSubSinkConnector.OrderingKeySource.NONE) {
            endpoint.setEnableMessageOrdering(true);
        }
        try {
            this.publisher = endpoint.build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean useFlowControl() {
        return (this.maxOutstandingRequestBytes == Long.MAX_VALUE && this.maxOutstandingRequestBytes == Long.MAX_VALUE) ? false : true;
    }

    public void stop() {
        log.info("Stopping CloudPubSubSinkTask");
        if (this.publisher != null) {
            log.info("Shutting down PubSub publisher");
            try {
                this.publisher.shutdown();
                if (!this.publisher.awaitTermination(this.maxShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
                    log.warn(String.format("PubSub publisher did not terminate cleanly in %d ms", Integer.valueOf(this.maxShutdownTimeoutMs)));
                }
            } catch (Exception e) {
                log.error("An exception occurred while shutting down PubSub publisher", (Throwable) e);
            }
        }
    }
}
