package org.apache.samza.system.elasticsearch;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.class */
public class ElasticsearchSystemProducer implements SystemProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSystemProducer.class);
    private final String system;
    private final IndexRequestFactory indexRequestFactory;
    private final BulkProcessorFactory bulkProcessorFactory;
    private final ElasticsearchSystemProducerMetrics metrics;
    private final Client client;
    private final AtomicBoolean sendFailed = new AtomicBoolean(false);
    private final AtomicReference<Throwable> thrown = new AtomicReference<>();
    private final Map<String, BulkProcessor> sourceBulkProcessor = new HashMap();

    public ElasticsearchSystemProducer(String str, BulkProcessorFactory bulkProcessorFactory, Client client, IndexRequestFactory indexRequestFactory, ElasticsearchSystemProducerMetrics elasticsearchSystemProducerMetrics) {
        this.system = str;
        this.bulkProcessorFactory = bulkProcessorFactory;
        this.client = client;
        this.indexRequestFactory = indexRequestFactory;
        this.metrics = elasticsearchSystemProducerMetrics;
    }

    public void start() {
    }

    public void stop() {
        for (Map.Entry<String, BulkProcessor> entry : this.sourceBulkProcessor.entrySet()) {
            flush(entry.getKey());
            entry.getValue().close();
        }
        this.client.close();
    }

    public void register(final String str) {
        this.sourceBulkProcessor.put(str, this.bulkProcessorFactory.getBulkProcessor(this.client, new BulkProcessor.Listener() { // from class: org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.1
            public void beforeBulk(long j, BulkRequest bulkRequest) {
            }

            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                boolean z = false;
                if (bulkResponse.hasFailures()) {
                    for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                        if (bulkItemResponse.isFailed()) {
                            if (bulkItemResponse.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
                                ElasticsearchSystemProducer.LOGGER.info("Failed to index document in Elasticsearch: " + bulkItemResponse.getFailureMessage());
                            } else {
                                z = true;
                                ElasticsearchSystemProducer.LOGGER.error("Failed to index document in Elasticsearch: " + bulkItemResponse.getFailureMessage());
                            }
                        }
                    }
                }
                if (z) {
                    ElasticsearchSystemProducer.this.sendFailed.set(true);
                } else {
                    updateSuccessMetrics(bulkResponse);
                }
            }

            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                ElasticsearchSystemProducer.LOGGER.error(th.getMessage());
                ElasticsearchSystemProducer.this.thrown.compareAndSet(null, th);
                ElasticsearchSystemProducer.this.sendFailed.set(true);
            }

            private void updateSuccessMetrics(BulkResponse bulkResponse) {
                ElasticsearchSystemProducer.this.metrics.bulkSendSuccess.inc();
                int i = 0;
                for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                    if (!bulkItemResponse.isFailed()) {
                        IndexResponse response = bulkItemResponse.getResponse();
                        if (response instanceof IndexResponse) {
                            i++;
                            if (response.isCreated()) {
                                ElasticsearchSystemProducer.this.metrics.inserts.inc();
                            } else {
                                ElasticsearchSystemProducer.this.metrics.updates.inc();
                            }
                        } else {
                            ElasticsearchSystemProducer.LOGGER.error("Unexpected Elasticsearch action response type: " + response.getClass().getSimpleName());
                        }
                    } else if (bulkItemResponse.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
                        ElasticsearchSystemProducer.this.metrics.conflicts.inc();
                    }
                }
                ElasticsearchSystemProducer.LOGGER.info(String.format("Wrote %s messages from %s to %s.", Integer.valueOf(i), str, ElasticsearchSystemProducer.this.system));
            }
        }));
    }

    public void send(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        this.sourceBulkProcessor.get(str).add(this.indexRequestFactory.getIndexRequest(outgoingMessageEnvelope));
    }

    public void flush(String str) {
        this.sourceBulkProcessor.get(str).flush();
        if (!this.sendFailed.get()) {
            LOGGER.info(String.format("Flushed %s to %s.", str, this.system));
            return;
        }
        String format = String.format("Unable to send message from %s to system %s.", str, this.system);
        LOGGER.error(format);
        Throwable th = this.thrown.get();
        if (th == null) {
            throw new SamzaException(format);
        }
        throw new SamzaException(format, th);
    }
}
