package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.ElasticsearchWriter;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchSinkTask.class */
public class ElasticsearchSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
    private ElasticsearchWriter writer;
    private ElasticsearchClient client;
    private Boolean createIndicesAtStartTime;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        start(map, null);
    }

    public void start(Map<String, String> map, ElasticsearchClient elasticsearchClient) {
        try {
            log.info("Starting ElasticsearchSinkTask");
            ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig = new ElasticsearchSinkConnectorConfig(map);
            long computeRetryWaitTimeInMillis = RetryUtil.computeRetryWaitTimeInMillis(elasticsearchSinkConnectorConfig.maxRetries(), elasticsearchSinkConnectorConfig.retryBackoffMs());
            if (computeRetryWaitTimeInMillis > RetryUtil.MAX_RETRY_TIME_MS) {
                log.warn("This connector uses exponential backoff with jitter for retries, and using '{}={}' and '{}={}' results in an impractical but possible maximum backoff time greater than {} hours.", new Object[]{ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, Integer.valueOf(elasticsearchSinkConnectorConfig.maxRetries()), ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, Long.valueOf(elasticsearchSinkConnectorConfig.retryBackoffMs()), Long.valueOf(TimeUnit.MILLISECONDS.toHours(computeRetryWaitTimeInMillis))});
            }
            if (elasticsearchClient != null) {
                this.client = elasticsearchClient;
            } else {
                this.client = new JestElasticsearchClient(map);
            }
            ElasticsearchWriter.Builder behaviorOnMalformedDoc = new ElasticsearchWriter.Builder(this.client).setType(elasticsearchSinkConnectorConfig.type()).setIgnoreKey(elasticsearchSinkConnectorConfig.ignoreKey(), elasticsearchSinkConnectorConfig.ignoreKeyTopics()).setIgnoreSchema(elasticsearchSinkConnectorConfig.ignoreSchema(), elasticsearchSinkConnectorConfig.ignoreSchemaTopics()).setCompactMapEntries(elasticsearchSinkConnectorConfig.useCompactMapEntries()).setTopicToIndexMap(elasticsearchSinkConnectorConfig.topicToIndexMap()).setFlushTimoutMs(elasticsearchSinkConnectorConfig.flushTimeoutMs()).setMaxBufferedRecords(elasticsearchSinkConnectorConfig.maxBufferedRecords()).setMaxInFlightRequests(elasticsearchSinkConnectorConfig.maxInFlightRequests()).setBatchSize(elasticsearchSinkConnectorConfig.batchSize()).setLingerMs(elasticsearchSinkConnectorConfig.lingerMs()).setRetryBackoffMs(elasticsearchSinkConnectorConfig.retryBackoffMs()).setMaxRetry(elasticsearchSinkConnectorConfig.maxRetries()).setDropInvalidMessage(elasticsearchSinkConnectorConfig.dropInvalidMessage()).setBehaviorOnNullValues(elasticsearchSinkConnectorConfig.behaviorOnNullValues()).setBehaviorOnMalformedDoc(elasticsearchSinkConnectorConfig.behaviorOnMalformedDoc());
            try {
                if (this.context.errantRecordReporter() == null) {
                    log.info("Errant record reporter not configured.");
                }
                behaviorOnMalformedDoc.setErrantRecordReporter(this.context.errantRecordReporter());
            } catch (NoClassDefFoundError | NoSuchMethodError e) {
                log.warn("AK versions prior to 2.6 do not support the errant record reporter");
            }
            this.createIndicesAtStartTime = Boolean.valueOf(elasticsearchSinkConnectorConfig.createIndicesAtStart());
            this.writer = behaviorOnMalformedDoc.build();
            this.writer.start();
            log.info("Started ElasticsearchSinkTask, will {} records with null values ('{}')", elasticsearchSinkConnectorConfig.behaviorOnNullValues().name(), ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG);
        } catch (ConfigException e2) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e2);
        }
    }

    public void open(Collection<TopicPartition> collection) {
        log.debug("Opening the task for topic partitions: {}", collection);
        if (this.createIndicesAtStartTime.booleanValue()) {
            HashSet hashSet = new HashSet();
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().topic());
            }
            this.writer.createIndicesForTopics(hashSet);
        }
    }

    public void put(Collection<SinkRecord> collection) throws ConnectException {
        log.debug("Putting {} records to Elasticsearch", Integer.valueOf(collection.size()));
        this.writer.write(collection);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        log.debug("Flushing data to Elasticsearch with the following offsets: {}", map);
        this.writer.flush();
    }

    public void close(Collection<TopicPartition> collection) {
        log.debug("Closing the task for topic partitions: {}", collection);
    }

    public void stop() throws ConnectException {
        log.info("Stopping ElasticsearchSinkTask");
        if (this.writer != null) {
            this.writer.stop();
        }
        if (this.client != null) {
            this.client.close();
        }
    }
}
