package org.apache.skywalking.banyandb.v1.client;

import com.google.auto.value.AutoValue;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.stub.AbstractAsyncStub;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.class */
public abstract class AbstractBulkWriteProcessor<REQ extends GeneratedMessageV3, STUB extends AbstractAsyncStub<STUB>> implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractBulkWriteProcessor.class);
    private final STUB stub;
    private final int maxBulkSize;
    private final int flushInterval;
    private final ArrayBlockingQueue<Holder> requests;
    private final Semaphore semaphore;
    private final long flushInternalInMillis;
    private final ScheduledThreadPoolExecutor scheduler;
    private final int timeout;
    private volatile long lastFlushTS = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor$Holder.class */
    public static abstract class Holder {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AbstractWrite writeEntity();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract CompletableFuture<Void> future();

        public static <REQ extends GeneratedMessageV3> Holder create(AbstractWrite<REQ> abstractWrite, CompletableFuture<Void> completableFuture) {
            completableFuture.whenComplete((r6, th) -> {
                if (th != null) {
                    AbstractBulkWriteProcessor.log.error("Failed to execute the request: {}", abstractWrite.toString(), th);
                }
            });
            return new AutoValue_AbstractBulkWriteProcessor_Holder(abstractWrite, completableFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBulkWriteProcessor(STUB stub, String str, int i, int i2, int i3, int i4) {
        this.stub = stub;
        this.maxBulkSize = i;
        this.flushInterval = i2;
        this.timeout = i4;
        this.requests = new ArrayBlockingQueue<>(i + 1);
        this.semaphore = new Semaphore(i3 > 0 ? i3 : 1);
        this.scheduler = new ScheduledThreadPoolExecutor(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("BanyanDB BulkProcessor");
            return thread;
        });
        this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.scheduler.setRemoveOnCancelPolicy(true);
        this.flushInternalInMillis = i2 * 1000;
        this.scheduler.scheduleWithFixedDelay(this, 0L, i2, TimeUnit.SECONDS);
    }

    public CompletableFuture<Void> add(AbstractWrite<REQ> abstractWrite) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.requests.put(Holder.create(abstractWrite, completableFuture));
        flushIfNeeded();
        return completableFuture;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            doPeriodicalFlush();
        } catch (Throwable th) {
            log.error("Failed to flush data to BanyanDB", th);
        }
    }

    protected void flushIfNeeded() {
        if (this.requests.size() >= this.maxBulkSize) {
            flush();
        }
    }

    private void doPeriodicalFlush() {
        if (System.currentTimeMillis() - this.lastFlushTS > this.flushInternalInMillis / 2) {
            flush();
        }
    }

    public void flush() {
        if (this.requests.isEmpty()) {
            return;
        }
        try {
            this.semaphore.acquire();
            ArrayList arrayList = new ArrayList(this.requests.size());
            this.requests.drainTo(arrayList);
            CompletableFuture<Void> doFlush = doFlush(arrayList);
            doFlush.whenComplete((r3, th) -> {
                this.semaphore.release();
            });
            doFlush.join();
            this.lastFlushTS = System.currentTimeMillis();
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to get semaphore to execute bulk requests", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected CompletableFuture<Void> doFlush(List<Holder> list) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        StreamObserver buildStreamObserver = buildStreamObserver(this.stub.withDeadlineAfter(this.timeout, TimeUnit.SECONDS), completableFuture);
        try {
            list.forEach(holder -> {
                AbstractWrite writeEntity = holder.writeEntity();
                try {
                    buildStreamObserver.onNext(writeEntity.build());
                    holder.future().complete(null);
                } catch (Throwable th) {
                    log.error("building the entity fails: {}", writeEntity.toString(), th);
                    holder.future().completeExceptionally(th);
                }
            });
            buildStreamObserver.onCompleted();
            completableFuture.whenComplete((r4, th) -> {
                if (th != null) {
                    log.error("Failed to execute requests in bulk", th);
                }
            });
            return completableFuture;
        } catch (Throwable th2) {
            buildStreamObserver.onCompleted();
            throw th2;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.scheduler.shutdownNow();
    }

    protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, CompletableFuture<Void> completableFuture);
}
