package org.apache.jackrabbit.oak.plugins.index.elastic.index;

import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.class */
class ElasticBulkProcessorHandler {
    private static final String SYNC_MODE_PROPERTY = "sync-mode";
    private static final String SYNC_RT_MODE = "rt";
    protected final ElasticConnection elasticConnection;
    protected final String indexName;
    protected final ElasticIndexDefinition indexDefinition;
    private final NodeBuilder definitionBuilder;
    private final boolean waitForESAcknowledgement;
    protected long totalOperations;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ElasticBulkProcessorHandler.class);
    private static final int BULK_PROCESSOR_CONCURRENCY = Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1).intValue();
    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000).intValue();
    private final Phaser phaser = new Phaser(1);
    private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = new ConcurrentLinkedQueue<>();
    private final ConcurrentHashMap<Long, Boolean> updatesMap = new ConcurrentHashMap<>();
    protected final BulkIngester<String> bulkIngester = initBulkIngester();

    /* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler$OakBulkListener.class */
    private class OakBulkListener implements BulkListener<String> {
        private OakBulkListener() {
        }

        @Override // co.elastic.clients.elasticsearch._helpers.bulk.BulkListener
        public void beforeBulk(long j, BulkRequest bulkRequest, List<String> list) {
            ElasticBulkProcessorHandler.this.phaser.register();
            ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.FALSE);
            ElasticBulkProcessorHandler.LOG.debug("Sending bulk with id {} -> {}", Long.valueOf(j), list);
            if (ElasticBulkProcessorHandler.LOG.isTraceEnabled()) {
                ElasticBulkProcessorHandler.LOG.trace("Bulk Requests: \n{}", bulkRequest.operations().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n")));
            }
        }

        @Override // co.elastic.clients.elasticsearch._helpers.bulk.BulkListener
        public void afterBulk(long j, BulkRequest bulkRequest, List<String> list, BulkResponse bulkResponse) {
            try {
                ElasticBulkProcessorHandler.LOG.debug("Bulk with id {} processed in {} ms", Long.valueOf(j), Long.valueOf(bulkResponse.took()));
                if (ElasticBulkProcessorHandler.LOG.isTraceEnabled()) {
                    ElasticBulkProcessorHandler.LOG.trace(bulkResponse.toString());
                }
                if (bulkResponse.items().stream().anyMatch(bulkResponseItem -> {
                    return bulkResponseItem.error() != null;
                })) {
                    LinkedHashSet linkedHashSet = new LinkedHashSet();
                    NodeBuilder child = ElasticBulkProcessorHandler.this.definitionBuilder.child(IndexDefinition.STATUS_NODE);
                    if (child.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
                        Iterator it = ((Iterable) child.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)).iterator();
                        while (it.hasNext()) {
                            linkedHashSet.add((String) it.next());
                        }
                    }
                    int size = linkedHashSet.size();
                    boolean z = false;
                    boolean z2 = false;
                    for (int i = 0; i < list.size(); i++) {
                        BulkResponseItem bulkResponseItem2 = bulkResponse.items().get(i);
                        if (bulkResponseItem2.error() != null) {
                            if (ElasticBulkProcessorHandler.this.indexDefinition.failOnError) {
                                ElasticBulkProcessorHandler.this.suppressedErrorCauses.add(bulkResponseItem2.error());
                            }
                            if (z || linkedHashSet.size() >= ElasticBulkProcessorHandler.this.FAILED_DOC_COUNT_FOR_STATUS_NODE) {
                                z = true;
                            } else {
                                linkedHashSet.add(list.get(i));
                            }
                            ElasticBulkProcessorHandler.LOG.error("ElasticIndex Update Doc Failure: Error while adding/updating doc with id: [{}]", list.get(i));
                            ElasticBulkProcessorHandler.LOG.error("Failure Details: BulkItem ID: {}, Index: {}, Failure Cause: {}", bulkResponseItem2.id(), bulkResponseItem2.index(), bulkResponseItem2.error());
                        } else if (!z2) {
                            ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.TRUE);
                            z2 = true;
                        }
                    }
                    if (z) {
                        ElasticBulkProcessorHandler.LOG.info("Cannot store all new Failed Docs because {} has been filled up. See previous log entries to find out the details of failed paths", IndexDefinition.FAILED_DOC_PATHS);
                    } else if (linkedHashSet.size() != size) {
                        child.setProperty(IndexDefinition.FAILED_DOC_PATHS, linkedHashSet, Type.STRINGS);
                    }
                } else {
                    ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.TRUE);
                }
            } finally {
                ElasticBulkProcessorHandler.this.phaser.arriveAndDeregister();
            }
        }

        @Override // co.elastic.clients.elasticsearch._helpers.bulk.BulkListener
        public void afterBulk(long j, BulkRequest bulkRequest, List<String> list, Throwable th) {
            try {
                ElasticBulkProcessorHandler.LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} threw an error", Long.valueOf(j), th);
                ElasticBulkProcessorHandler.this.suppressedErrorCauses.add(ErrorCause.of(builder -> {
                    StringWriter stringWriter = new StringWriter();
                    th.printStackTrace(new PrintWriter(stringWriter));
                    return builder.reason(th.getMessage()).stackTrace(stringWriter.toString());
                }));
                ElasticBulkProcessorHandler.this.phaser.arriveAndDeregister();
            } catch (Throwable th2) {
                ElasticBulkProcessorHandler.this.phaser.arriveAndDeregister();
                throw th2;
            }
        }
    }

    /* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler$RealTimeBulkProcessorHandler.class */
    protected static class RealTimeBulkProcessorHandler extends ElasticBulkProcessorHandler {
        private RealTimeBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull String str, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder, boolean z) {
            super(elasticConnection, str, elasticIndexDefinition, nodeBuilder, z);
        }

        @Override // org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler
        public boolean close() throws IOException {
            boolean close = super.close();
            if (this.totalOperations > 0) {
                ElasticBulkProcessorHandler.LOG.debug("Forcing refresh");
                try {
                    this.elasticConnection.getClient().indices().refresh(builder -> {
                        return builder.index(this.indexName, new String[0]);
                    });
                } catch (IOException e) {
                    ElasticBulkProcessorHandler.LOG.warn("Error refreshing index " + this.indexName, (Throwable) e);
                }
            }
            return close;
        }
    }

    private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull String str, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder, boolean z) {
        this.elasticConnection = elasticConnection;
        this.indexName = str;
        this.indexDefinition = elasticIndexDefinition;
        this.definitionBuilder = nodeBuilder;
        this.waitForESAcknowledgement = z;
    }

    public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull String str, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder, CommitInfo commitInfo, boolean z) {
        PropertyState property;
        if (elasticIndexDefinition.getDefinitionNodeState().getProperty(IndexConstants.ASYNC_PROPERTY_NAME) != null) {
            return new ElasticBulkProcessorHandler(elasticConnection, str, elasticIndexDefinition, nodeBuilder, z);
        }
        String str2 = null;
        if (commitInfo != null) {
            str2 = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
        }
        if (str2 == null && (property = elasticIndexDefinition.getDefinitionNodeState().getProperty(SYNC_MODE_PROPERTY)) != null) {
            str2 = (String) property.getValue(Type.STRING);
        }
        return SYNC_RT_MODE.equals(str2) ? new RealTimeBulkProcessorHandler(elasticConnection, str, elasticIndexDefinition, nodeBuilder, z) : new ElasticBulkProcessorHandler(elasticConnection, str, elasticIndexDefinition, nodeBuilder, z);
    }

    private BulkIngester<String> initBulkIngester() {
        return BulkIngester.of(builder -> {
            BulkIngester.Builder listener = builder.client(this.elasticConnection.getAsyncClient()).listener(new OakBulkListener());
            if (this.indexDefinition.bulkActions > 0) {
                listener = listener.maxOperations(this.indexDefinition.bulkActions);
            }
            if (this.indexDefinition.bulkSizeBytes > 0) {
                listener = listener.maxSize(this.indexDefinition.bulkSizeBytes);
            }
            if (this.indexDefinition.bulkFlushIntervalMs > 0) {
                listener = listener.flushInterval(this.indexDefinition.bulkFlushIntervalMs, TimeUnit.MILLISECONDS);
            }
            return listener.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
        });
    }

    private void checkFailures() throws IOException {
        if (this.suppressedErrorCauses.isEmpty()) {
            return;
        }
        IOException iOException = new IOException("Exception while indexing. See suppressed for details");
        Stream map = this.suppressedErrorCauses.stream().map(errorCause -> {
            return new IllegalStateException(errorCause.reason());
        });
        Objects.requireNonNull(iOException);
        map.forEach((v1) -> {
            r1.addSuppressed(v1);
        });
        throw iOException;
    }

    public void update(String str, ElasticDocument elasticDocument) throws IOException {
        add(BulkOperation.of(builder -> {
            return builder.index(builder -> {
                return ((IndexOperation.Builder) ((IndexOperation.Builder) builder.index(this.indexName)).id(str)).document(elasticDocument);
            });
        }), str);
    }

    public void delete(String str) throws IOException {
        add(BulkOperation.of(builder -> {
            return builder.delete(builder -> {
                return builder.index(this.indexName).id(str);
            });
        }), str);
    }

    private void add(BulkOperation bulkOperation, String str) throws IOException {
        checkFailures();
        this.bulkIngester.add(bulkOperation, (BulkOperation) str);
        this.totalOperations++;
    }

    public boolean close() throws IOException {
        LOG.trace("Calling close on bulk ingester {}", this.bulkIngester);
        this.bulkIngester.close();
        LOG.trace("Bulk Ingester {} closed", this.bulkIngester);
        int arriveAndDeregister = this.phaser.arriveAndDeregister();
        if (this.totalOperations == 0) {
            LOG.debug("No operations executed in this processor. Close immediately");
            return false;
        }
        if (this.waitForESAcknowledgement) {
            try {
                this.phaser.awaitAdvanceInterruptibly(arriveAndDeregister, this.indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for bulk processor to close", (Throwable) e);
                Thread.currentThread().interrupt();
            } catch (TimeoutException e2) {
                LOG.error("Error waiting for bulk requests to return", (Throwable) e2);
            }
        }
        checkFailures();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Bulk identifier -> update status = {}", this.updatesMap);
        }
        return this.updatesMap.containsValue(Boolean.TRUE);
    }
}
