package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ServerApiConfig;
import com.mongodb.kafka.connect.util.SslConfigs;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/sink/MongoSinkTask.class */
public class MongoSinkTask extends SinkTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private static final String CONNECTOR_TYPE = "sink";
    private StartedMongoSinkTask startedTask;

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return Versions.VERSION;
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB sink task");
        MongoClient mongoClient = null;
        try {
            MongoSinkConfig mongoSinkConfig = new MongoSinkConfig(map);
            mongoClient = createMongoClient(mongoSinkConfig);
            this.startedTask = new StartedMongoSinkTask(mongoSinkConfig, mongoClient, createErrorReporter());
            LOGGER.debug("Started MongoDB sink task");
        } catch (RuntimeException e) {
            MongoClient mongoClient2 = mongoClient;
            if (mongoClient2 != null) {
                try {
                    mongoClient2.close();
                } catch (RuntimeException e2) {
                    e.addSuppressed(e2);
                    throw new ConnectException("Failed to start MongoDB sink task", e);
                }
            }
            throw new ConnectException("Failed to start MongoDB sink task", e);
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTask
    public void put(Collection<SinkRecord> collection) {
        this.startedTask.put(collection);
    }

    @Override // org.apache.kafka.connect.sink.SinkTask
    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flush called - noop");
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.startedTask != null) {
            this.startedTask.close();
        }
    }

    private ErrorReporter createErrorReporter() {
        ErrorReporter nopErrorReporter = nopErrorReporter();
        if (this.context != null) {
            try {
                ErrantRecordReporter errantRecordReporter = this.context.errantRecordReporter();
                if (errantRecordReporter != null) {
                    Objects.requireNonNull(errantRecordReporter);
                    nopErrorReporter = (v1, v2) -> {
                        r0.report(v1, v2);
                    };
                } else {
                    LOGGER.info("Errant record reporter not configured.");
                }
            } catch (NoClassDefFoundError | NoSuchMethodError e) {
                LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
            }
        }
        return nopErrorReporter;
    }

    static ErrorReporter nopErrorReporter() {
        return (sinkRecord, exc) -> {
        };
    }

    private static MongoClient createMongoClient(MongoSinkConfig mongoSinkConfig) {
        MongoClientSettings.Builder applyToSslSettings = MongoClientSettings.builder().applyConnectionString(mongoSinkConfig.getConnectionString()).applyToSslSettings(builder -> {
            SslConfigs.setupSsl(builder, mongoSinkConfig);
        });
        ServerApiConfig.setServerApi(applyToSslSettings, mongoSinkConfig);
        return MongoClients.create(applyToSslSettings.build(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE, mongoSinkConfig.getString("provider")));
    }
}
