package com.google.pubsub.kafka.source;

import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.kafka.source.CloudPubSubSourceConnector;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:com/google/pubsub/kafka/source/CloudPubSubSourceTask.class */
public class CloudPubSubSourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CloudPubSubSourceTask.class);
    private static final int NUM_CPS_SUBSCRIBERS = 10;
    private String kafkaTopic;
    private ProjectSubscriptionName cpsSubscription;
    private String kafkaMessageKeyAttribute;
    private String kafkaMessageTimestampAttribute;
    private boolean makeOrderingKeyAttribute;
    private int kafkaPartitions;
    private CloudPubSubSourceConnector.PartitionScheme kafkaPartitionScheme;
    private CloudPubSubSubscriber subscriber;
    private boolean useKafkaHeaders;
    private int currentRoundRobinPartition = -1;
    private final Set<String> standardAttributes = new HashSet();

    public CloudPubSubSourceTask() {
    }

    @VisibleForTesting
    public CloudPubSubSourceTask(CloudPubSubSubscriber cloudPubSubSubscriber) {
        this.subscriber = cloudPubSubSubscriber;
    }

    public String version() {
        return new CloudPubSubSourceConnector().version();
    }

    public void start(Map<String, String> map) {
        Map<String, Object> parse = new CloudPubSubSourceConnector().config().parse(map);
        this.cpsSubscription = ProjectSubscriptionName.newBuilder().setProject(parse.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString()).setSubscription(parse.get(CloudPubSubSourceConnector.CPS_SUBSCRIPTION_CONFIG).toString()).build();
        String str = (String) parse.get(ConnectorUtils.CPS_ENDPOINT);
        this.kafkaTopic = parse.get("kafka.topic").toString();
        int intValue = ((Integer) parse.get(CloudPubSubSourceConnector.CPS_MAX_BATCH_SIZE_CONFIG)).intValue();
        this.kafkaPartitions = ((Integer) parse.get(CloudPubSubSourceConnector.KAFKA_PARTITIONS_CONFIG)).intValue();
        this.kafkaMessageKeyAttribute = (String) parse.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_KEY_CONFIG);
        this.kafkaMessageTimestampAttribute = (String) parse.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_TIMESTAMP_CONFIG);
        this.kafkaPartitionScheme = CloudPubSubSourceConnector.PartitionScheme.getEnum((String) parse.get(CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG));
        this.useKafkaHeaders = ((Boolean) parse.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS)).booleanValue();
        this.makeOrderingKeyAttribute = ((Boolean) parse.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE)).booleanValue();
        boolean booleanValue = ((Boolean) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED)).booleanValue();
        long longValue = ((Long) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_BYTES)).longValue();
        long longValue2 = ((Long) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES)).longValue();
        int intValue2 = ((Integer) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_PARALLEL_STREAMS)).intValue();
        long longValue3 = ((Long) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS)).longValue();
        long longValue4 = ((Long) parse.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION)).longValue();
        ConnectorCredentialsProvider fromConfig = ConnectorCredentialsProvider.fromConfig(parse);
        if (this.subscriber == null) {
            if (booleanValue) {
                this.subscriber = new StreamingPullSubscriber(messageReceiver -> {
                    Subscriber.Builder executorProvider = Subscriber.newBuilder(this.cpsSubscription, messageReceiver).setCredentialsProvider(fromConfig).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block).setMaxOutstandingElementCount(Long.valueOf(longValue2)).setMaxOutstandingRequestBytes(Long.valueOf(longValue)).build()).setParallelPullCount(intValue2).setEndpoint(str).setExecutorProvider(FixedExecutorProvider.create(ConnectorUtils.getSystemExecutor()));
                    if (longValue3 > 0) {
                        executorProvider.setMaxAckExtensionPeriod(Duration.ofMillis(longValue3));
                    }
                    if (longValue4 > 0) {
                        executorProvider.setMaxDurationPerAckExtension(Duration.ofMillis(longValue4));
                    }
                    return executorProvider.build();
                });
            } else {
                this.subscriber = new AckBatchingSubscriber(new CloudPubSubRoundRobinSubscriber(10, fromConfig, str, this.cpsSubscription, intValue), runnable -> {
                    return ConnectorUtils.getSystemExecutor().scheduleAtFixedRate(runnable, 100L, 100L, TimeUnit.MILLISECONDS);
                });
            }
        }
        this.standardAttributes.add(this.kafkaMessageKeyAttribute);
        this.standardAttributes.add(this.kafkaMessageTimestampAttribute);
        log.info("Started a CloudPubSubSourceTask.");
    }

    public List<SourceRecord> poll() throws InterruptedException {
        log.debug("Polling...");
        try {
            List<ReceivedMessage> list = this.subscriber.pull().get();
            ArrayList arrayList = new ArrayList();
            log.trace("Received " + list.size() + " messages");
            for (ReceivedMessage receivedMessage : list) {
                PubsubMessage message = receivedMessage.getMessage();
                String ackId = receivedMessage.getAckId();
                Map<String, String> attributesMap = message.getAttributesMap();
                String orderingKey = message.getOrderingKey();
                String str = ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE.equals(this.kafkaMessageKeyAttribute) ? orderingKey : attributesMap.get(this.kafkaMessageKeyAttribute);
                Long longValue = getLongValue(attributesMap.get(this.kafkaMessageTimestampAttribute));
                if (longValue == null) {
                    longValue = Long.valueOf(Timestamps.toMillis(message.getPublishTime()));
                }
                byte[] byteArray = message.getData().toByteArray();
                boolean z = (this.standardAttributes.containsAll(attributesMap.keySet()) && (!this.makeOrderingKeyAttribute || orderingKey == null || orderingKey.isEmpty())) ? false : true;
                Map<String, String> singletonMap = Collections.singletonMap(this.cpsSubscription.toString(), ackId);
                arrayList.add(z ? this.useKafkaHeaders ? createRecordWithHeaders(attributesMap, singletonMap, str, orderingKey, byteArray, longValue) : createRecordWithStruct(attributesMap, singletonMap, str, orderingKey, byteArray, longValue) : new SourceRecord((Map) null, singletonMap, this.kafkaTopic, selectPartition(str, byteArray, orderingKey), Schema.OPTIONAL_STRING_SCHEMA, str, Schema.BYTES_SCHEMA, byteArray, longValue));
            }
            return arrayList;
        } catch (Exception e) {
            log.info("Error while retrieving records, treating as an empty poll. " + e);
            return new ArrayList();
        }
    }

    private SourceRecord createRecordWithHeaders(Map<String, String> map, Map<String, String> map2, String str, String str2, byte[] bArr, Long l) {
        ConnectHeaders connectHeaders = new ConnectHeaders();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!entry.getKey().equals(this.kafkaMessageKeyAttribute)) {
                connectHeaders.addString(entry.getKey(), entry.getValue());
            }
        }
        if (this.makeOrderingKeyAttribute && str2 != null && !str2.isEmpty()) {
            connectHeaders.addString(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, str2);
        }
        return new SourceRecord((Map) null, map2, this.kafkaTopic, selectPartition(str, bArr, str2), Schema.OPTIONAL_STRING_SCHEMA, str, Schema.BYTES_SCHEMA, bArr, l, connectHeaders);
    }

    private SourceRecord createRecordWithStruct(Map<String, String> map, Map<String, String> map2, String str, String str2, byte[] bArr, Long l) {
        SchemaBuilder field = SchemaBuilder.struct().field(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, Schema.BYTES_SCHEMA);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!entry.getKey().equals(this.kafkaMessageKeyAttribute)) {
                field.field(entry.getKey(), Schema.STRING_SCHEMA);
            }
        }
        if (this.makeOrderingKeyAttribute && str2 != null && !str2.isEmpty()) {
            field.field(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, Schema.STRING_SCHEMA);
        }
        Schema build = field.build();
        Struct put = new Struct(build).put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, bArr);
        for (Field field2 : build.fields()) {
            if (field2.name().equals(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE)) {
                put.put(field2.name(), str2);
            } else if (!field2.name().equals(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
                put.put(field2.name(), map.get(field2.name()));
            }
        }
        return new SourceRecord((Map) null, map2, this.kafkaTopic, selectPartition(str, put, str2), Schema.OPTIONAL_STRING_SCHEMA, str, build, put, l);
    }

    private Integer selectPartition(Object obj, Object obj2, String str) {
        if (this.kafkaPartitionScheme.equals(CloudPubSubSourceConnector.PartitionScheme.HASH_KEY)) {
            return Integer.valueOf(obj == null ? 0 : Math.abs(obj.hashCode()) % this.kafkaPartitions);
        }
        if (this.kafkaPartitionScheme.equals(CloudPubSubSourceConnector.PartitionScheme.HASH_VALUE)) {
            return Integer.valueOf(Math.abs(obj2.hashCode()) % this.kafkaPartitions);
        }
        if (this.kafkaPartitionScheme.equals(CloudPubSubSourceConnector.PartitionScheme.KAFKA_PARTITIONER)) {
            return null;
        }
        if (this.kafkaPartitionScheme.equals(CloudPubSubSourceConnector.PartitionScheme.ORDERING_KEY) && str != null && !str.isEmpty()) {
            return Integer.valueOf(Math.abs(str.hashCode()) % this.kafkaPartitions);
        }
        int i = this.currentRoundRobinPartition + 1;
        this.currentRoundRobinPartition = i;
        this.currentRoundRobinPartition = i % this.kafkaPartitions;
        return Integer.valueOf(this.currentRoundRobinPartition);
    }

    private Long getLongValue(String str) {
        if (str == null) {
            return null;
        }
        try {
            return Long.valueOf(str);
        } catch (NumberFormatException e) {
            log.error("Error while converting `{}` to number", str, e);
            return null;
        }
    }

    public void stop() {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        String obj = sourceRecord.sourceOffset().get(this.cpsSubscription.toString()).toString();
        ApiFutures.catching(this.subscriber.ackMessages(ImmutableList.of(obj)), ApiException.class, apiException -> {
            log.warn("Failed to acknowledge message: " + apiException);
            return null;
        }, MoreExecutors.directExecutor());
        log.trace("Committed {}", obj);
    }
}
