package io.openk9.search.client.internal;

import io.openk9.json.api.JsonFactory;
import io.openk9.osgi.util.AutoCloseables;
import io.openk9.search.client.api.BulkReactorActionListener;
import io.openk9.search.client.api.RestHighLevelClientProvider;
import io.openk9.search.client.internal.configuration.ElasticSearchConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Component(immediate = true, service = {ElasticSearchIndexer.class})
/* loaded from: input_file:io/openk9/search/client/internal/ElasticSearchIndexer.class */
public class ElasticSearchIndexer {
    private Sinks.Many<DocWriteRequest<?>> _dataMany;
    private final List<AutoCloseables.AutoCloseableSafe> _registrationList = new ArrayList();

    @Reference
    private JsonFactory _jsonFactory;

    @Reference
    private RestHighLevelClientProvider _restHighLevelClientProvider;

    @Reference
    private ElasticSearchConfiguration _elasticSearchConfiguration;
    private static final Logger _log = LoggerFactory.getLogger(ElasticSearchIndexer.class);

    @Activate
    public void activate() {
        this._dataMany = Sinks.many().unicast().onBackpressureBuffer();
        this._registrationList.add(AutoCloseables.mergeAutoCloseableToSafe(new AutoCloseable[]{_closeMany(this._dataMany), _registerIndexer(this._dataMany, this._elasticSearchConfiguration.bufferMaxSize(), this._elasticSearchConfiguration.bufferMaxTime())}));
    }

    @Modified
    public void modified() {
        deactivate();
        activate();
    }

    @Deactivate
    public void deactivate() {
        Iterator<AutoCloseables.AutoCloseableSafe> it = this._registrationList.iterator();
        while (it.hasNext()) {
            it.next().close();
            it.remove();
        }
        this._dataMany = null;
    }

    public void sendDocWriteRequest(DocWriteRequest<?> docWriteRequest) {
        this._dataMany.emitNext(docWriteRequest, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    private AutoCloseable _registerIndexer(Sinks.Many<DocWriteRequest<?>> many, int i, long j) {
        Disposable subscribe = many.asFlux().groupBy((v0) -> {
            return v0.index();
        }).flatMap(groupedFlux -> {
            return groupedFlux.bufferTimeout(i, Duration.ofMillis(j));
        }).flatMap(list -> {
            return Flux.create(fluxSink -> {
                this._restHighLevelClientProvider.get().bulkAsync(new BulkRequest().add(list), RequestOptions.DEFAULT, new BulkReactorActionListener(fluxSink));
            });
        }).doOnNext(bulkItemResponse -> {
            if (_log.isDebugEnabled()) {
                try {
                    _log.debug("BulkResponse: " + bulkItemResponse.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), (ToXContent.Params) null));
                } catch (IOException e) {
                    _log.error(e.getMessage(), e);
                }
            }
        }).onErrorContinue(this::_manageExceptions).subscribe();
        Objects.requireNonNull(subscribe);
        return subscribe::dispose;
    }

    private void _manageExceptions(Throwable th, Object obj) {
        if (_log.isErrorEnabled()) {
            if (obj == null) {
                _log.error(th.getMessage(), th);
            } else {
                _log.error("error on object: { " + obj.toString() + " }", th);
            }
        }
    }

    private AutoCloseable _closeMany(Sinks.Many many) {
        return () -> {
            many.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        };
    }
}
