package org.apache.jackrabbit.oak.plugins.index.elastic.index;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.class */
public class ElasticBulkProcessorHandler {
    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE;
    private static final String SYNC_MODE_PROPERTY = "sync-mode";
    private static final String SYNC_RT_MODE = "rt";
    protected final ElasticConnection elasticConnection;
    protected final ElasticIndexDefinition indexDefinition;
    private final NodeBuilder definitionBuilder;
    protected final BulkProcessor bulkProcessor;
    private final Phaser phaser;
    private volatile IOException ioException;
    private final ConcurrentHashMap<Long, Boolean> updatesMap;
    protected long totalOperations;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
    private static boolean waitForESAcknowledgement = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler$OakBulkProcessorListener.class */
    public class OakBulkProcessorListener implements BulkProcessor.Listener {
        private OakBulkProcessorListener() {
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void beforeBulk(long j, BulkRequest bulkRequest) {
            ElasticBulkProcessorHandler.this.phaser.register();
            ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.FALSE);
            ElasticBulkProcessorHandler.LOG.debug("Sending bulk with id {} -> {}", Long.valueOf(j), bulkRequest.getDescription());
            if (ElasticBulkProcessorHandler.LOG.isTraceEnabled()) {
                ElasticBulkProcessorHandler.LOG.trace("Bulk Requests: \n{}", bulkRequest.requests().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n")));
            }
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            ElasticBulkProcessorHandler.LOG.debug("Bulk with id {} processed with status {} in {}", new Object[]{Long.valueOf(j), bulkResponse.status(), bulkResponse.getTook()});
            if (ElasticBulkProcessorHandler.LOG.isTraceEnabled()) {
                try {
                    ElasticBulkProcessorHandler.LOG.trace(Strings.toString(bulkResponse.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)));
                } catch (IOException e) {
                    ElasticBulkProcessorHandler.LOG.error("Error decoding bulk response", e);
                }
            }
            if (bulkResponse.hasFailures()) {
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                NodeBuilder child = ElasticBulkProcessorHandler.this.definitionBuilder.child(IndexDefinition.STATUS_NODE);
                if (child.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
                    Iterator it = ((Iterable) child.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)).iterator();
                    while (it.hasNext()) {
                        linkedHashSet.add((String) it.next());
                    }
                }
                int size = linkedHashSet.size();
                boolean z = false;
                boolean z2 = false;
                Iterator<BulkItemResponse> it2 = bulkResponse.iterator();
                while (it2.hasNext()) {
                    BulkItemResponse next = it2.next();
                    if (next.isFailed()) {
                        BulkItemResponse.Failure failure = next.getFailure();
                        if (z || linkedHashSet.size() >= ElasticBulkProcessorHandler.this.FAILED_DOC_COUNT_FOR_STATUS_NODE) {
                            z = true;
                        } else {
                            linkedHashSet.add(next.getId());
                        }
                        ElasticBulkProcessorHandler.LOG.error("ElasticIndex Update Doc Failure: Error while adding/updating doc with id : [{}]", next.getId());
                        ElasticBulkProcessorHandler.LOG.error("Failure Details: BulkItem ID: " + failure.getId() + ", Failure Cause: {}", failure.getCause());
                    } else if (!z2) {
                        ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.TRUE);
                        z2 = true;
                    }
                }
                if (z) {
                    ElasticBulkProcessorHandler.LOG.info("Cannot store all new Failed Docs because {} has been filled up. See previous log entries to find out the details of failed paths", IndexDefinition.FAILED_DOC_PATHS);
                } else if (linkedHashSet.size() != size) {
                    child.setProperty(IndexDefinition.FAILED_DOC_PATHS, linkedHashSet, Type.STRINGS);
                }
            } else {
                ElasticBulkProcessorHandler.this.updatesMap.put(Long.valueOf(j), Boolean.TRUE);
            }
            ElasticBulkProcessorHandler.this.phaser.arriveAndDeregister();
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            ElasticBulkProcessorHandler.LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} threw an error", Long.valueOf(j), th);
            ElasticBulkProcessorHandler.this.ioException = new IOException(th);
            ElasticBulkProcessorHandler.this.phaser.arriveAndDeregister();
        }
    }

    /* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler$RealTimeBulkProcessorHandler.class */
    protected static class RealTimeBulkProcessorHandler extends ElasticBulkProcessorHandler {
        private final AtomicBoolean isClosed;
        private final AtomicBoolean isDataSearchable;

        private RealTimeBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder) {
            super(elasticConnection, elasticIndexDefinition, nodeBuilder);
            this.isClosed = new AtomicBoolean(false);
            this.isDataSearchable = new AtomicBoolean(false);
        }

        @Override // org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler
        protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> requestConsumer() {
            return (bulkRequest, actionListener) -> {
                if (this.isClosed.get()) {
                    ElasticBulkProcessorHandler.LOG.debug("Processor is closing. Next request with {} actions will block until the data is searchable", Integer.valueOf(bulkRequest.requests().size()));
                    bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
                    this.isDataSearchable.set(true);
                }
                this.elasticConnection.getClient().bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
            };
        }

        @Override // org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler
        public boolean close() throws IOException {
            this.isClosed.set(true);
            boolean close = super.close();
            if (this.totalOperations > 0 && !this.isDataSearchable.get()) {
                ElasticBulkProcessorHandler.LOG.debug("Forcing refresh");
                try {
                    this.elasticConnection.getClient().indices().refresh(new RefreshRequest(this.indexDefinition.getRemoteIndexAlias()), RequestOptions.DEFAULT);
                } catch (IOException e) {
                    ElasticBulkProcessorHandler.LOG.warn("Error refreshing index " + this.indexDefinition.getRemoteIndexAlias(), e);
                }
            }
            return close;
        }
    }

    private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder) {
        this.FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000).intValue();
        this.phaser = new Phaser(1);
        this.updatesMap = new ConcurrentHashMap<>();
        this.elasticConnection = elasticConnection;
        this.indexDefinition = elasticIndexDefinition;
        this.definitionBuilder = nodeBuilder;
        this.bulkProcessor = initBulkProcessor();
    }

    public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull ElasticIndexDefinition elasticIndexDefinition, @NotNull NodeBuilder nodeBuilder, CommitInfo commitInfo) {
        PropertyState property;
        PropertyState property2 = elasticIndexDefinition.getDefinitionNodeState().getProperty("async");
        if (property2 != null) {
            if (!commitInfo.getInfo().containsKey("indexingCheckpointTime")) {
                waitForESAcknowledgement = false;
            }
            return new ElasticBulkProcessorHandler(elasticConnection, elasticIndexDefinition, nodeBuilder);
        }
        String str = null;
        if (commitInfo != null) {
            str = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
        }
        if (str == null && (property = elasticIndexDefinition.getDefinitionNodeState().getProperty(SYNC_MODE_PROPERTY)) != null) {
            str = (String) property.getValue(Type.STRING);
        }
        return SYNC_RT_MODE.equals(str) ? new RealTimeBulkProcessorHandler(elasticConnection, elasticIndexDefinition, nodeBuilder) : new ElasticBulkProcessorHandler(elasticConnection, elasticIndexDefinition, nodeBuilder);
    }

    private BulkProcessor initBulkProcessor() {
        return BulkProcessor.builder(requestConsumer(), new OakBulkProcessorListener()).setBulkActions(this.indexDefinition.bulkActions).setBulkSize(new ByteSizeValue(this.indexDefinition.bulkSizeBytes)).setFlushInterval(TimeValue.timeValueMillis(this.indexDefinition.bulkFlushIntervalMs)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(this.indexDefinition.bulkRetriesBackoff), this.indexDefinition.bulkRetries)).build();
    }

    protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> requestConsumer() {
        return (bulkRequest, actionListener) -> {
            this.elasticConnection.getClient().bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
        };
    }

    public void add(DocWriteRequest<?> docWriteRequest) {
        this.bulkProcessor.add(docWriteRequest);
        this.totalOperations++;
    }

    public boolean close() throws IOException {
        LOG.trace("Calling close on bulk processor {}", this.bulkProcessor);
        this.bulkProcessor.close();
        LOG.trace("Bulk Processor {} closed", this.bulkProcessor);
        int arriveAndDeregister = this.phaser.arriveAndDeregister();
        if (this.totalOperations == 0) {
            LOG.debug("No operations executed in this processor. Close immediately");
            return false;
        }
        if (waitForESAcknowledgement) {
            try {
                this.phaser.awaitAdvanceInterruptibly(arriveAndDeregister, this.indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | TimeoutException e) {
                LOG.error("Error waiting for bulk requests to return", e);
            }
        }
        if (this.ioException != null) {
            throw this.ioException;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Bulk identifier -> update status = {}", this.updatesMap);
        }
        return this.updatesMap.containsValue(Boolean.TRUE);
    }
}
