package com.google.enterprise.cloudsearch.sdk.indexing.template;

import com.google.api.client.json.GenericJson;
import com.google.api.services.cloudsearch.v1.model.Item;
import com.google.api.services.cloudsearch.v1.model.ItemMetadata;
import com.google.api.services.cloudsearch.v1.model.Operation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.enterprise.cloudsearch.sdk.CheckpointCloseableIterable;
import com.google.enterprise.cloudsearch.sdk.IncrementalChangeHandler;
import com.google.enterprise.cloudsearch.sdk.InvalidConfigurationException;
import com.google.enterprise.cloudsearch.sdk.RepositoryException;
import com.google.enterprise.cloudsearch.sdk.config.Configuration;
import com.google.enterprise.cloudsearch.sdk.indexing.DefaultAcl;
import com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector;
import com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnectorContext;
import com.google.enterprise.cloudsearch.sdk.indexing.IndexingService;
import com.google.enterprise.cloudsearch.sdk.indexing.template.QueueCheckpoint;
import com.google.enterprise.cloudsearch.sdk.indexing.template.RepositoryContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.class */
public class FullTraversalConnector implements IndexingConnector, IncrementalChangeHandler {
    public static final String NUM_THREADS = "traverse.threadPoolSize";
    public static final String TRAVERSE_USE_QUEUES = "traverse.useQueues";
    public static final String TRAVERSE_PARTITION_SIZE = "traverse.partitionSize";
    public static final String TRAVERSE_QUEUE_TAG = "traverse.queueTag";
    public static final String CHECKPOINT_FULL = "checkpoint_full";
    public static final String CHECKPOINT_INCREMENTAL = "checkpoint_incremental";
    public static final String CHECKPOINT_QUEUE = "checkpoint_queue";
    public static final String QUEUE_NAME = "FullTraversal||";
    static final int DEFAULT_THREAD_NUM = 50;
    static final int DEFAULT_PARTITION_SIZE = 50;
    static final String IGNORE_FAILURE = "ignore";
    private static final boolean DEFAULT_USE_QUEUES = true;
    private static final Logger logger = Logger.getLogger(FullTraversalConnector.class.getName());
    private final Repository repository;
    private DefaultAcl defaultAcl;
    private IndexingService indexingService;
    private Integer numThreads;
    private Long numToAbort;
    private ThreadPoolExecutor threadPoolExecutor;
    private ListeningExecutorService listeningExecutorService;
    private RepositoryContext repositoryContext;
    private CheckpointHandler checkpointHandler;
    private boolean useQueues;

    @VisibleForTesting
    QueueCheckpoint queueCheckpoint;
    private int partitionSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector$ExecuteCounter.class */
    public static class ExecuteCounter {
        private AtomicLong total = new AtomicLong();
        private AtomicLong success = new AtomicLong();
        private AtomicLong fail = new AtomicLong();

        ExecuteCounter() {
        }

        void incrementTotal() {
            this.total.getAndIncrement();
        }

        void incrementSuccess() {
            this.success.getAndIncrement();
        }

        void incrementFail() {
            this.fail.getAndIncrement();
        }

        long getTotal() {
            return this.total.get();
        }

        long getSuccess() {
            return this.success.get();
        }

        long getFail() {
            return this.fail.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector$ExecuteOperationCallable.class */
    public class ExecuteOperationCallable implements Callable<List<GenericJson>> {
        private ApiOperation operation;
        private ExecuteCounter executeCounter;
        private long localNumToAbort;
        private String queueName;

        public ExecuteOperationCallable(ApiOperation apiOperation, ExecuteCounter executeCounter, Long l, String str) {
            this.operation = apiOperation;
            this.executeCounter = executeCounter;
            this.localNumToAbort = l.longValue();
            this.queueName = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<GenericJson> call() throws Exception {
            return executeOperation(this.operation, this.executeCounter);
        }

        private List<GenericJson> executeOperation(ApiOperation apiOperation, ExecuteCounter executeCounter) throws InterruptedException, IOException {
            executeCounter.incrementTotal();
            String name = apiOperation instanceof RepositoryDoc ? ((RepositoryDoc) apiOperation).getItem().getName() : "[not an item update]";
            try {
                List<GenericJson> execute = apiOperation.execute(FullTraversalConnector.this.indexingService, Optional.of(this::modifyApiOperation));
                executeCounter.incrementSuccess();
                return execute;
            } catch (IOException e) {
                executeCounter.incrementFail();
                if (executeCounter.getFail() > this.localNumToAbort) {
                    FullTraversalConnector.logger.log(Level.WARNING, "Error updating item: {0} (aborting).", name);
                    throw new IOException(e);
                }
                FullTraversalConnector.logger.log(Level.WARNING, "Error updating item: {0} (abort skip count: {1}).", new Object[]{name, Long.valueOf(executeCounter.getFail())});
                FullTraversalConnector.logger.log(Level.WARNING, "Error executing API Operation", (Throwable) e);
                return Collections.emptyList();
            }
        }

        private void modifyApiOperation(ApiOperation apiOperation) {
            if (apiOperation instanceof RepositoryDoc) {
                RepositoryDoc repositoryDoc = (RepositoryDoc) this.operation;
                FullTraversalConnector.this.setDefaultAcls(repositoryDoc);
                if (FullTraversalConnector.this.useQueues) {
                    Item item = repositoryDoc.getItem();
                    if (item.getQueue() != null) {
                        String queue = item.getQueue();
                        if (!FullTraversalConnector.this.queueCheckpoint.isManagedQueueName(queue)) {
                            FullTraversalConnector.logger.log(Level.WARNING, "Overriding existing queue name ({1}) for item: {0}", new Object[]{item.getName(), queue});
                        }
                    }
                    item.setQueue(this.queueName);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector$GetDocsFunction.class */
    public interface GetDocsFunction {
        CheckpointCloseableIterable<ApiOperation> apply(byte[] bArr) throws RepositoryException;
    }

    public FullTraversalConnector(Repository repository) {
        this(repository, null);
    }

    public FullTraversalConnector(Repository repository, CheckpointHandler checkpointHandler) {
        this.repository = (Repository) Preconditions.checkNotNull(repository, "Repository cannot be null.");
        this.checkpointHandler = checkpointHandler;
    }

    public String getDefaultId() {
        return this.repository.getClass().getName();
    }

    @Override // com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector
    public void init(IndexingConnectorContext indexingConnectorContext) throws Exception {
        Preconditions.checkState(Configuration.isInitialized(), "configuration not initialized");
        this.indexingService = (IndexingService) Preconditions.checkNotNull(indexingConnectorContext.getIndexingService());
        this.defaultAcl = DefaultAcl.fromConfiguration(this.indexingService);
        this.repositoryContext = new RepositoryContext.Builder().setEventBus(new EventBus("EventBus-" + getClass().getName())).setDefaultAclMode(this.defaultAcl.getDefaultAclMode()).build();
        if (this.checkpointHandler == null) {
            this.checkpointHandler = LocalFileCheckpointHandler.fromConfiguration();
        }
        this.useQueues = ((Boolean) Configuration.getBoolean(TRAVERSE_USE_QUEUES, true).get()).booleanValue();
        QueueCheckpoint.Builder builder = new QueueCheckpoint.Builder(this.useQueues);
        if (this.useQueues) {
            String str = (String) Configuration.getString(TRAVERSE_QUEUE_TAG, this.repository.getClass().getSimpleName()).get();
            builder.setCheckpointHandler(this.checkpointHandler).setQueueA("FullTraversal||A: " + str).setQueueB("FullTraversal||B: " + str);
        }
        this.queueCheckpoint = builder.build();
        this.numThreads = (Integer) Configuration.getInteger(NUM_THREADS, 50).get();
        this.numToAbort = (Long) Configuration.getValue(TraverseExceptionHandlerFactory.TRAVERSE_EXCEPTION_HANDLER, 0L, str2 -> {
            if (IGNORE_FAILURE.equals(str2)) {
                return Long.MAX_VALUE;
            }
            try {
                return Long.valueOf(Long.parseLong(str2));
            } catch (NumberFormatException e) {
                throw new InvalidConfigurationException("Unrecognized value for traversal exception handler: " + str2, e);
            }
        }).get();
        this.threadPoolExecutor = new ThreadPoolExecutor(this.numThreads.intValue(), this.numThreads.intValue(), 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10 * this.numThreads.intValue()), new ThreadPoolExecutor.CallerRunsPolicy());
        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.threadPoolExecutor);
        this.partitionSize = ((Integer) Configuration.getInteger(TRAVERSE_PARTITION_SIZE, 50).get()).intValue();
        Configuration.checkConfiguration(this.partitionSize > 0, "Partition size can not be less than or equal to 0. Configured value %s", new Object[]{Integer.valueOf(this.partitionSize)});
        this.repositoryContext.getEventBus().register(this);
        this.repository.init(this.repositoryContext);
        logger.log(Level.INFO, "start full traversal connector executors");
    }

    @Override // com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector
    public void traverse() throws IOException, InterruptedException {
        Operation operation;
        String currentQueueName = this.queueCheckpoint.getCurrentQueueName();
        if (this.useQueues) {
            String currentOperation = this.queueCheckpoint.getCurrentOperation();
            if (!Strings.isNullOrEmpty(currentOperation)) {
                try {
                    operation = this.indexingService.getOperation(currentOperation);
                } catch (IOException e) {
                    logger.log(Level.WARNING, "Error checking on status of deleteQueueItems " + currentOperation, (Throwable) e);
                }
                if (!Boolean.TRUE.equals(operation.getDone())) {
                    logger.log(Level.FINE, "Postponing full traversal; deleteQueueItems not finished for {0}", currentOperation);
                    return;
                } else {
                    logger.log(Level.FINE, "deleteQueueItems result for {0}: {1}", new Object[]{currentOperation, getOperationResult(operation)});
                    currentQueueName = this.queueCheckpoint.getNextQueueName(currentQueueName);
                    this.queueCheckpoint.saveCheckpoint(currentQueueName, "");
                }
            }
        }
        Repository repository = this.repository;
        repository.getClass();
        if (!doTraverse("full traversal", "checkpoint_full", currentQueueName, repository::getAllDocs)) {
            throw new NullPointerException("getAllDocs returned null");
        }
        if (this.useQueues) {
            String nextQueueName = this.queueCheckpoint.getNextQueueName(currentQueueName);
            try {
                Operation operation2 = (Operation) this.indexingService.deleteQueueItems(nextQueueName).get();
                if (Boolean.TRUE.equals(operation2.getDone())) {
                    logger.log(Level.FINE, "deleteQueueItems result for {0}: {1}", new Object[]{nextQueueName, getOperationResult(operation2)});
                    this.queueCheckpoint.saveCheckpoint(nextQueueName, "");
                } else {
                    logger.log(Level.FINE, "deleteQueueItems running for {0}", operation2.getName());
                    this.queueCheckpoint.saveCheckpoint(currentQueueName, operation2.getName());
                }
            } catch (IOException | ExecutionException e2) {
                logger.log(Level.WARNING, "Error calling deleteQueueItems", e2);
            }
        }
    }

    private String getOperationResult(Operation operation) {
        return operation.getError() != null ? operation.getError().toString() : operation.getResponse() != null ? operation.getResponse().toString() : "Done: " + operation.getDone();
    }

    private boolean doTraverse(String str, String str2, String str3, GetDocsFunction getDocsFunction) throws IOException, InterruptedException {
        boolean hasMore;
        logger.log(Level.INFO, "Begin {0} traversal.", str);
        ExecuteCounter executeCounter = new ExecuteCounter();
        byte[] readCheckpoint = this.checkpointHandler.readCheckpoint(str2);
        do {
            CheckpointCloseableIterable<ApiOperation> apply = getDocsFunction.apply(readCheckpoint);
            Throwable th = null;
            if (apply == null) {
                logger.log(Level.INFO, "End {0} traversal.", str);
                if (apply != null) {
                    if (0 != 0) {
                        try {
                            apply.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        apply.close();
                    }
                }
                return false;
            }
            try {
                processApiOperations(apply, executeCounter, str3);
                logCounters(executeCounter);
                readCheckpoint = apply.getCheckpoint();
                this.checkpointHandler.saveCheckpoint(str2, readCheckpoint);
                hasMore = apply.hasMore();
                if (apply != null) {
                    if (0 != 0) {
                        try {
                            apply.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        apply.close();
                    }
                }
            } catch (Throwable th4) {
                if (apply != null) {
                    if (0 != 0) {
                        try {
                            apply.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        apply.close();
                    }
                }
                throw th4;
            }
        } while (hasMore);
        logger.log(Level.INFO, "End {0} traversal.", str);
        return true;
    }

    @Subscribe
    public synchronized void handleAsyncOperation(AsyncApiOperation asyncApiOperation) {
        logger.log(Level.INFO, "Processing an asynchronous repository operation.");
        try {
            String currentQueueName = this.queueCheckpoint.getCurrentQueueName();
            asyncApiOperation.setResult(this.listeningExecutorService.submit(new ExecuteOperationCallable(asyncApiOperation.getOperation(), new ExecuteCounter(), 0L, currentQueueName)));
        } catch (IOException e) {
            logger.log(Level.WARNING, "Exception occured while processing an asynchronous repository operation: ", (Throwable) e);
            asyncApiOperation.getResult().cancel(true);
        }
    }

    public void saveCheckpoint(boolean z) throws IOException, InterruptedException {
    }

    public void destroy() {
        this.repository.close();
        this.repositoryContext.getEventBus().unregister(this);
        if (this.listeningExecutorService != null) {
            logger.log(Level.INFO, "Shutting down the full traversal connector executor");
            MoreExecutors.shutdownAndAwaitTermination(this.listeningExecutorService, 5L, TimeUnit.MINUTES);
        }
    }

    public synchronized void handleIncrementalChanges() throws IOException, InterruptedException {
        String currentQueueName = this.queueCheckpoint.getCurrentQueueName();
        Repository repository = this.repository;
        repository.getClass();
        doTraverse("incremental traversal", "checkpoint_incremental", currentQueueName, repository::getChanges);
    }

    private void logCounters(ExecuteCounter executeCounter) {
        if (executeCounter.getFail() != 0) {
            logger.log(Level.WARNING, "{0} operations failed out of {1}", new Object[]{Long.valueOf(executeCounter.getFail()), Long.valueOf(executeCounter.getTotal())});
        } else if (executeCounter.getSuccess() > 0) {
            logger.log(Level.INFO, "{0} operations executed successfully.", Long.valueOf(executeCounter.getSuccess()));
        } else {
            logger.log(Level.INFO, "No operations returned during traversal.");
        }
    }

    private void processApiOperations(Iterable<ApiOperation> iterable, ExecuteCounter executeCounter, String str) throws IOException, InterruptedException {
        for (List<ApiOperation> list : Iterables.partition(iterable, this.partitionSize)) {
            ArrayList arrayList = new ArrayList();
            for (ApiOperation apiOperation : list) {
                if (executeCounter.getFail() > this.numToAbort.longValue()) {
                    break;
                } else {
                    arrayList.add(this.listeningExecutorService.submit(new ExecuteOperationCallable(apiOperation, executeCounter, this.numToAbort, str)));
                }
            }
            try {
                Futures.allAsList(arrayList).get();
            } catch (ExecutionException e) {
                throw new IOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setDefaultAcls(RepositoryDoc repositoryDoc) {
        if (repositoryDoc.getItem().getMetadata() == null) {
            repositoryDoc.getItem().setMetadata(new ItemMetadata());
        }
        this.defaultAcl.applyToIfEnabled(repositoryDoc.getItem());
    }
}
