package io.aiven.kafka.connect.gcs;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aiven/kafka/connect/gcs/GcsSinkTask.class */
public final class GcsSinkTask extends SinkTask {
    private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class);
    private static final String USER_AGENT_HEADER_KEY = "user-agent";
    private RecordGrouper recordGrouper;
    private GcsSinkConfig config;
    private Storage storage;

    public GcsSinkTask() {
    }

    public GcsSinkTask(Map<String, String> map, Storage storage) {
        Objects.requireNonNull(map, "props cannot be null");
        Objects.requireNonNull(storage, "storage cannot be null");
        this.config = new GcsSinkConfig(map);
        this.storage = storage;
        initRest();
    }

    public void start(Map<String, String> map) {
        Objects.requireNonNull(map, "props cannot be null");
        this.config = new GcsSinkConfig(map);
        this.storage = StorageOptions.newBuilder().setHost(this.config.getGcsEndpoint()).setCredentials(this.config.getCredentials()).setHeaderProvider(FixedHeaderProvider.create(new String[]{USER_AGENT_HEADER_KEY, this.config.getUserAgent()})).setRetrySettings(RetrySettings.newBuilder().setInitialRetryDelay(this.config.getGcsRetryBackoffInitialDelay()).setMaxRetryDelay(this.config.getGcsRetryBackoffMaxDelay()).setRetryDelayMultiplier(this.config.getGcsRetryBackoffDelayMultiplier()).setTotalTimeout(this.config.getGcsRetryBackoffTotalTimeout()).setMaxAttempts(this.config.getGcsRetryBackoffMaxAttempts()).build()).build().getService();
        initRest();
        if (Objects.nonNull(this.config.getKafkaRetryBackoffMs())) {
            this.context.timeout(this.config.getKafkaRetryBackoffMs().longValue());
        }
    }

    private void initRest() {
        try {
            this.recordGrouper = RecordGrouperFactory.newRecordGrouper(this.config);
        } catch (Exception e) {
            throw new ConnectException("Unsupported file name template " + this.config.getFilename(), e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        Objects.requireNonNull(collection, "records cannot be null");
        LOG.debug("Processing {} records", Integer.valueOf(collection.size()));
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            this.recordGrouper.put(it.next());
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            this.recordGrouper.records().forEach(this::flushFile);
        } finally {
            this.recordGrouper.clear();
        }
    }

    private void flushFile(String str, List<SinkRecord> list) {
        try {
            OutputStream newOutputStream = Channels.newOutputStream((WritableByteChannel) this.storage.writer(BlobInfo.newBuilder(this.config.getBucketName(), this.config.getPrefix() + str).build(), new Storage.BlobWriteOption[0]));
            try {
                OutputWriter build = OutputWriter.builder().withExternalProperties(this.config.originalsStrings()).withOutputFields(this.config.getOutputFields()).withCompressionType(this.config.getCompressionType()).withEnvelopeEnabled(this.config.envelopeEnabled()).build(newOutputStream, this.config.getFormatType());
                try {
                    build.writeRecords(list);
                    if (build != null) {
                        build.close();
                    }
                    if (newOutputStream != null) {
                        newOutputStream.close();
                    }
                } catch (Throwable th) {
                    if (build != null) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectException(e);
        }
    }

    public void stop() {
    }

    public String version() {
        return Version.VERSION;
    }
}
