package com.google.pubsublite.kafka.sink;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

/* loaded from: input_file:com/google/pubsublite/kafka/sink/PubSubLiteSinkTask.class */
public class PubSubLiteSinkTask extends SinkTask {
    private final PublisherFactory factory;

    @Nullable
    private Publisher<MessageMetadata> publisher;

    @VisibleForTesting
    PubSubLiteSinkTask(PublisherFactory publisherFactory) {
        this.factory = publisherFactory;
    }

    public PubSubLiteSinkTask() {
        this(new PublisherFactoryImpl());
    }

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void start(Map<String, String> map) {
        if (this.publisher != null) {
            throw new IllegalStateException("Called start when publisher already exists.");
        }
        this.publisher = this.factory.newPublisher(map);
        this.publisher.startAsync().awaitRunning();
    }

    public void put(Collection<SinkRecord> collection) {
        if (this.publisher.state() != ApiService.State.RUNNING) {
            if (this.publisher.state() != ApiService.State.FAILED) {
                throw new IllegalStateException("Publisher not currently running.");
            }
            throw new IllegalStateException("Publisher has failed.", this.publisher.failureCause());
        }
        for (SinkRecord sinkRecord : collection) {
            PubSubMessage.Builder newBuilder = PubSubMessage.newBuilder();
            if (sinkRecord.key() != null) {
                newBuilder.setKey(Schemas.encodeToBytes(sinkRecord.keySchema(), sinkRecord.key()));
            }
            if (sinkRecord.value() != null) {
                newBuilder.setData(Schemas.encodeToBytes(sinkRecord.valueSchema(), sinkRecord.value()));
            }
            ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder();
            getRecordHeaders(sinkRecord).forEach(header -> {
                builder.put((ImmutableListMultimap.Builder) header.key(), (String) Schemas.encodeToBytes(header.schema(), header.value()));
            });
            if (sinkRecord.topic() != null) {
                builder.put((ImmutableListMultimap.Builder) Constants.KAFKA_TOPIC_HEADER, (String) ByteString.copyFromUtf8(sinkRecord.topic()));
            }
            if (sinkRecord.kafkaPartition() != null) {
                builder.put((ImmutableListMultimap.Builder) Constants.KAFKA_PARTITION_HEADER, (String) ByteString.copyFromUtf8(sinkRecord.kafkaPartition().toString()));
                builder.put((ImmutableListMultimap.Builder) Constants.KAFKA_OFFSET_HEADER, (String) ByteString.copyFromUtf8(Long.toString(sinkRecord.kafkaOffset())));
            }
            if (sinkRecord.timestamp() != null) {
                builder.put((ImmutableListMultimap.Builder) Constants.KAFKA_EVENT_TIME_TYPE_HEADER, (String) ByteString.copyFromUtf8(sinkRecord.timestampType().name));
                newBuilder.setEventTime(Timestamps.fromMillis(sinkRecord.timestamp().longValue()));
            }
            builder.build().asMap().forEach((str, collection2) -> {
                newBuilder.putAttributes(str, AttributeValues.newBuilder().addAllValues(collection2).build());
            });
            this.publisher.publish(newBuilder.build());
        }
    }

    private Iterable<? extends Header> getRecordHeaders(SinkRecord sinkRecord) {
        ConnectHeaders connectHeaders = new ConnectHeaders();
        if (sinkRecord.headers() != null) {
            Iterator it = sinkRecord.headers().iterator();
            while (it.hasNext()) {
                connectHeaders.add((Header) it.next());
            }
        }
        return connectHeaders;
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            if (this.publisher != null) {
                this.publisher.flush();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        try {
            if (this.publisher == null) {
                throw new IllegalStateException("Called stop when publisher doesn't exist.");
            }
            try {
                this.publisher.flush();
                this.publisher.stopAsync().awaitTerminated();
                this.publisher = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.publisher = null;
            throw th;
        }
    }
}
