package io.datarouter.virtualnode.writebehind.base;

import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.storage.node.op.NodeOps;
import io.datarouter.util.collection.CollectionTool;
import io.datarouter.util.concurrent.FutureTool;
import io.datarouter.virtualnode.writebehind.WriteBehindNode;
import io.datarouter.virtualnode.writebehind.config.DatarouterVirtualNodeExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/virtualnode/writebehind/base/BaseWriteBehindNode.class */
public abstract class BaseWriteBehindNode<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, N extends NodeOps<PK, D>> implements WriteBehindNode<PK, D, N> {
    public static final int FLUSH_RATE_MS = 500;
    private static final int FLUSH_BATCH_SIZE = 100;
    private final ExecutorService writeExecutor;
    private final Queue<WriteWrapper<?>> queue;
    private final BaseWriteBehindNode<PK, D, N>.QueueFlusher queueFlusher;
    protected final N backingNode;
    protected final long timeoutMs;
    protected final Queue<OutstandingWriteWrapper> outstandingWrites;
    private static final Logger logger = LoggerFactory.getLogger(BaseWriteBehindNode.class);
    private static final long DEFAULT_TIMEOUT_MS = Duration.ofMinutes(1).toMillis();

    /* loaded from: input_file:io/datarouter/virtualnode/writebehind/base/BaseWriteBehindNode$QueueFlusher.class */
    private class QueueFlusher implements Runnable {
        private WriteWrapper<Object> previousWriteWrapper;

        private QueueFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BaseWriteBehindNode.logger.info("Futures submited count={} node={}", Integer.valueOf(flushQueue().size()), BaseWriteBehindNode.this.backingNode);
            } catch (Throwable th) {
                BaseWriteBehindNode.logger.error("Failed to flush queue for {}", BaseWriteBehindNode.this.backingNode, th);
            }
        }

        private synchronized List<Future<?>> flushQueue() {
            ArrayList arrayList = new ArrayList();
            this.previousWriteWrapper = null;
            while (!BaseWriteBehindNode.this.queue.isEmpty()) {
                WriteWrapper<?> poll = BaseWriteBehindNode.this.queue.poll();
                if (this.previousWriteWrapper != null && (!poll.getOp().equals(this.previousWriteWrapper.getOp()) || poll.getConfig() != null)) {
                    arrayList.add(handlePrevious());
                }
                if (poll.getConfig() != null) {
                    arrayList.add(handleWriteWrapper(poll));
                } else {
                    List<?> objects = poll.getObjects();
                    if (this.previousWriteWrapper == null) {
                        this.previousWriteWrapper = new WriteWrapper<>(poll.getOp(), Collections.emptyList(), null);
                    }
                    int size = this.previousWriteWrapper.getObjects().size();
                    this.previousWriteWrapper.getObjects().addAll(objects.subList(0, Math.min(BaseWriteBehindNode.FLUSH_BATCH_SIZE - size, objects.size())));
                    if (this.previousWriteWrapper.getObjects().size() == BaseWriteBehindNode.FLUSH_BATCH_SIZE) {
                        arrayList.add(handlePrevious());
                    }
                    int i = 1;
                    while ((i * BaseWriteBehindNode.FLUSH_BATCH_SIZE) - size < objects.size()) {
                        int i2 = (i * BaseWriteBehindNode.FLUSH_BATCH_SIZE) - size;
                        i++;
                        int min = Math.min((i * BaseWriteBehindNode.FLUSH_BATCH_SIZE) - size, objects.size());
                        if (this.previousWriteWrapper == null) {
                            this.previousWriteWrapper = new WriteWrapper<>(poll.getOp(), Collections.emptyList(), null);
                        }
                        this.previousWriteWrapper.getObjects().addAll(objects.subList(i2, min));
                        if (this.previousWriteWrapper.getObjects().size() == BaseWriteBehindNode.FLUSH_BATCH_SIZE) {
                            arrayList.add(handlePrevious());
                        }
                    }
                }
            }
            if (this.previousWriteWrapper != null) {
                arrayList.add(handlePrevious());
            }
            return arrayList;
        }

        private Future<?> handlePrevious() {
            Future<?> handleWriteWrapper = handleWriteWrapper(this.previousWriteWrapper);
            this.previousWriteWrapper = null;
            return handleWriteWrapper;
        }

        private Future<?> handleWriteWrapper(WriteWrapper<?> writeWrapper) {
            List<?> objects = writeWrapper.getObjects();
            if (CollectionTool.nullSafeIsEmpty(objects)) {
                return null;
            }
            String format = String.format("%s with %s %s", writeWrapper.getOp(), Integer.valueOf(objects.size()), CollectionTool.getFirst(objects).getClass().getSimpleName());
            WriteWrapper<?> m5clone = writeWrapper.m5clone();
            Future<?> submit = BaseWriteBehindNode.this.writeExecutor.submit(() -> {
                try {
                    if (BaseWriteBehindNode.this.handleWriteWrapperInternal(m5clone)) {
                        return;
                    }
                    BaseWriteBehindNode.logger.error("unhandled op desc={}", format);
                } catch (Throwable th) {
                    BaseWriteBehindNode.logger.error("opDesc={}", format, th);
                }
            });
            BaseWriteBehindNode.this.outstandingWrites.add(new OutstandingWriteWrapper(Long.valueOf(System.currentTimeMillis()), submit, format));
            return submit;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public BaseWriteBehindNode(DatarouterVirtualNodeExecutors.DatarouterWriteBehindScheduler datarouterWriteBehindScheduler, DatarouterVirtualNodeExecutors.DatarouterWriteBehindExecutor datarouterWriteBehindExecutor, N n) {
        Objects.requireNonNull(n, "backingNode cannot be null.");
        this.backingNode = n;
        this.writeExecutor = datarouterWriteBehindExecutor;
        this.timeoutMs = DEFAULT_TIMEOUT_MS;
        this.outstandingWrites = new ConcurrentLinkedQueue();
        datarouterWriteBehindScheduler.scheduleWithFixedDelay(new OverdueWriteCanceller(this), 0L, 1000L, TimeUnit.MILLISECONDS);
        this.queueFlusher = new QueueFlusher();
        datarouterWriteBehindScheduler.scheduleWithFixedDelay(this.queueFlusher, 500L, 500L, TimeUnit.MILLISECONDS);
        this.queue = new LinkedBlockingQueue();
    }

    @Override // io.datarouter.virtualnode.writebehind.WriteBehindNode
    public Queue<WriteWrapper<?>> getQueue() {
        return this.queue;
    }

    @Override // io.datarouter.virtualnode.writebehind.WriteBehindNode
    public N getBackingNode() {
        return this.backingNode;
    }

    public void flush() {
        this.queueFlusher.flushQueue().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(FutureTool::get);
    }
}
