package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClient.class */
public class ElasticsearchClient {
    private static final String RESOURCE_ALREADY_EXISTS_EXCEPTION = "resource_already_exists_exception";
    private static final String VERSION_CONFLICT_EXCEPTION = "version_conflict_engine_exception";
    protected final AtomicInteger numRecords;
    private final AtomicReference<ConnectException> error;
    protected final BulkProcessor bulkProcessor;
    private final ConcurrentMap<DocWriteRequest<?>, SinkRecord> requestToRecord;
    private final ConcurrentMap<Long, List<SinkRecord>> inFlightRequests;
    private final ElasticsearchSinkConnectorConfig config;
    private final ErrantRecordReporter reporter;
    private final RestHighLevelClient client;
    private final ScheduledExecutorService executorService;
    private final Time clock;
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);
    private static final long WAIT_TIME = TimeUnit.MILLISECONDS.toMillis(10);
    private static final Set<String> MALFORMED_DOC_ERRORS = new HashSet(Arrays.asList("mapper_parsing_exception", "illegal_argument_exception", "action_request_validation_exception"));

    /* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClient$ReportingException.class */
    public static class ReportingException extends RuntimeException {
        public ReportingException(String str) {
            super(str);
        }

        @Override // java.lang.Throwable
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    public ElasticsearchClient(ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig, ErrantRecordReporter errantRecordReporter) {
        ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(elasticsearchSinkConnectorConfig);
        NHttpClientConnectionManager connectionManager = configCallbackHandler.connectionManager();
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        this.executorService.scheduleAtFixedRate(() -> {
            connectionManager.closeExpiredConnections();
            connectionManager.closeIdleConnections(elasticsearchSinkConnectorConfig.maxIdleTimeMs(), TimeUnit.MILLISECONDS);
        }, elasticsearchSinkConnectorConfig.maxIdleTimeMs(), elasticsearchSinkConnectorConfig.maxIdleTimeMs() / 2, TimeUnit.MILLISECONDS);
        this.numRecords = new AtomicInteger(0);
        this.error = new AtomicReference<>();
        this.requestToRecord = errantRecordReporter != null ? new ConcurrentHashMap() : null;
        this.inFlightRequests = errantRecordReporter != null ? new ConcurrentHashMap() : null;
        this.config = elasticsearchSinkConnectorConfig;
        this.reporter = errantRecordReporter;
        this.clock = Time.SYSTEM;
        this.client = new RestHighLevelClient(RestClient.builder((HttpHost[]) ((List) elasticsearchSinkConnectorConfig.connectionUrls().stream().map(HttpHost::create).collect(Collectors.toList())).toArray(new HttpHost[elasticsearchSinkConnectorConfig.connectionUrls().size()])).setHttpClientConfigCallback(configCallbackHandler).setRequestConfigCallback(configCallbackHandler));
        this.bulkProcessor = BulkProcessor.builder((bulkRequest, actionListener) -> {
            this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
        }, buildListener()).setBulkActions(elasticsearchSinkConnectorConfig.batchSize()).setConcurrentRequests(elasticsearchSinkConnectorConfig.maxInFlightRequests() - 1).setFlushInterval(TimeValue.timeValueMillis(elasticsearchSinkConnectorConfig.lingerMs())).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(elasticsearchSinkConnectorConfig.retryBackoffMs()), elasticsearchSinkConnectorConfig.maxRetries())).build();
    }

    public RestHighLevelClient client() {
        return this.client;
    }

    public void close() {
        try {
            try {
                if (this.bulkProcessor.awaitClose(this.config.flushTimeoutMs(), TimeUnit.MILLISECONDS)) {
                } else {
                    throw new ConnectException("Failed to process outstanding requests in time while closing the ElasticsearchClient.");
                }
            } catch (InterruptedException e) {
                throw new ConnectException("Interrupted while processing all in-flight requests on ElasticsearchClient close.");
            }
        } finally {
            closeConnections();
        }
    }

    public boolean createIndex(String str) {
        if (indexExists(str)) {
            return false;
        }
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(str);
        return ((Boolean) callWithRetries("create index " + str, () -> {
            try {
                this.client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
                return true;
            } catch (ElasticsearchStatusException | IOException e) {
                if (e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
                    return false;
                }
                throw e;
            }
        })).booleanValue();
    }

    public void createMapping(String str, Schema schema) {
        PutMappingRequest source = new PutMappingRequest(new String[]{str}).source(Mapping.buildMapping(schema));
        callWithRetries(String.format("create mapping for index %s with schema %s", str, schema), () -> {
            return this.client.indices().putMapping(source, RequestOptions.DEFAULT);
        });
    }

    public void flush() {
        this.bulkProcessor.flush();
    }

    public boolean hasMapping(String str) {
        MappingMetaData mapping = mapping(str);
        return (mapping == null || mapping.sourceAsMap() == null || mapping.sourceAsMap().isEmpty()) ? false : true;
    }

    public void index(SinkRecord sinkRecord, DocWriteRequest<?> docWriteRequest) {
        if (isFailed()) {
            try {
                close();
            } catch (ConnectException e) {
            }
            throw this.error.get();
        }
        long milliseconds = this.clock.milliseconds() + this.config.flushTimeoutMs();
        while (this.numRecords.get() >= this.config.maxBufferedRecords()) {
            this.clock.sleep(WAIT_TIME);
            if (this.clock.milliseconds() > milliseconds) {
                throw new ConnectException(String.format("Could not make space in the internal buffer fast enough. Consider increasing %s or %s.", ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG, ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG));
            }
        }
        addToRequestToRecordMap(docWriteRequest, sinkRecord);
        this.numRecords.incrementAndGet();
        this.bulkProcessor.add(docWriteRequest);
    }

    public boolean indexExists(String str) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(new String[]{str});
        return ((Boolean) callWithRetries("check if index " + str + " exists", () -> {
            return Boolean.valueOf(this.client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
        })).booleanValue();
    }

    private void addToRequestToRecordMap(DocWriteRequest<?> docWriteRequest, SinkRecord sinkRecord) {
        if (this.requestToRecord != null) {
            this.requestToRecord.put(docWriteRequest, sinkRecord);
        }
    }

    private BulkProcessor.Listener buildListener() {
        return new BulkProcessor.Listener() { // from class: io.confluent.connect.elasticsearch.ElasticsearchClient.1
            public void beforeBulk(long j, BulkRequest bulkRequest) {
                if (ElasticsearchClient.this.requestToRecord == null || ElasticsearchClient.this.inFlightRequests == null) {
                    return;
                }
                ArrayList arrayList = new ArrayList(bulkRequest.requests().size());
                for (DocWriteRequest docWriteRequest : bulkRequest.requests()) {
                    arrayList.add(ElasticsearchClient.this.requestToRecord.get(docWriteRequest));
                    ElasticsearchClient.this.requestToRecord.remove(docWriteRequest);
                }
                ElasticsearchClient.this.inFlightRequests.put(Long.valueOf(j), arrayList);
            }

            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                Iterator it = bulkResponse.iterator();
                while (it.hasNext()) {
                    ElasticsearchClient.this.handleResponse((BulkItemResponse) it.next(), j);
                }
                ElasticsearchClient.this.removeFromInFlightRequests(j);
                ElasticsearchClient.this.numRecords.addAndGet(-bulkResponse.getItems().length);
            }

            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                ElasticsearchClient.log.warn("Bulk request {} failed. Retrying request.", Long.valueOf(j), th);
                try {
                    afterBulk(j, bulkRequest, (BulkResponse) ElasticsearchClient.this.callWithRetries("retrying bulk request", () -> {
                        return ElasticsearchClient.this.client.bulk(bulkRequest, RequestOptions.DEFAULT);
                    }));
                } catch (ConnectException e) {
                    ElasticsearchClient.this.removeFromInFlightRequests(j);
                    ElasticsearchClient.this.error.compareAndSet(null, new ConnectException("Bulk request failed.", e.getCause()));
                    ElasticsearchClient.this.numRecords.addAndGet(-bulkRequest.requests().size());
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T callWithRetries(String str, Callable<T> callable) {
        try {
            return (T) RetryUtil.callWithRetries(str, callable, this.config.maxRetries(), this.config.retryBackoffMs());
        } catch (Exception e) {
            throw new ConnectException("Failed to " + str + ".", e);
        }
    }

    private void closeConnections() {
        this.executorService.shutdown();
        try {
            this.client.close();
        } catch (IOException e) {
            log.warn("Failed to close Elasticsearch client.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResponse(BulkItemResponse bulkItemResponse, long j) {
        if (bulkItemResponse.isFailed()) {
            Iterator<String> it = MALFORMED_DOC_ERRORS.iterator();
            while (it.hasNext()) {
                if (bulkItemResponse.getFailureMessage().contains(it.next())) {
                    handleMalformedDocResponse(bulkItemResponse);
                    reportBadRecord(bulkItemResponse, j);
                    return;
                }
            }
            if (!bulkItemResponse.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION)) {
                this.error.compareAndSet(null, new ConnectException("Indexing record failed.", bulkItemResponse.getFailure().getCause()));
            } else {
                log.warn("Ignoring version conflict for operation {} on document '{}' version {} in index '{}'.", new Object[]{bulkItemResponse.getOpType(), bulkItemResponse.getId(), Long.valueOf(bulkItemResponse.getVersion()), bulkItemResponse.getIndex()});
                reportBadRecord(bulkItemResponse, j);
            }
        }
    }

    private void handleMalformedDocResponse(BulkItemResponse bulkItemResponse) {
        String format = String.format("Encountered an illegal document error '%s'. Ignoring and will not index record.", bulkItemResponse.getFailureMessage());
        switch (this.config.behaviorOnMalformedDoc()) {
            case IGNORE:
                log.debug(format);
                return;
            case WARN:
                log.warn(format);
                return;
            case FAIL:
            default:
                log.error("Encountered an illegal document error '{}'. To ignore future records like this, change the configuration '{}' to '{}'.", new Object[]{bulkItemResponse.getFailureMessage(), ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc.IGNORE});
                this.error.compareAndSet(null, new ConnectException("Indexing record failed.", bulkItemResponse.getFailure().getCause()));
                return;
        }
    }

    private boolean isFailed() {
        return this.error.get() != null;
    }

    private MappingMetaData mapping(String str) {
        GetMappingsRequest indices = new GetMappingsRequest().indices(new String[]{str});
        return (MappingMetaData) ((GetMappingsResponse) callWithRetries("get mapping for index " + str, () -> {
            return this.client.indices().getMapping(indices, RequestOptions.DEFAULT);
        })).mappings().get(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFromInFlightRequests(long j) {
        if (this.inFlightRequests != null) {
            this.inFlightRequests.remove(Long.valueOf(j));
        }
    }

    private synchronized void reportBadRecord(BulkItemResponse bulkItemResponse, long j) {
        if (this.reporter != null) {
            List<SinkRecord> orDefault = this.inFlightRequests.getOrDefault(Long.valueOf(j), new ArrayList());
            SinkRecord sinkRecord = orDefault.size() > bulkItemResponse.getItemId() ? orDefault.get(bulkItemResponse.getItemId()) : null;
            if (sinkRecord != null) {
                this.reporter.report(sinkRecord, new ReportingException("Indexing failed: " + bulkItemResponse.getFailureMessage()));
            }
        }
    }
}
