package com.github.loki4j.logback;

import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.LifeCycle;
import com.github.loki4j.common.Batcher;
import com.github.loki4j.common.BinaryBatch;
import com.github.loki4j.common.LogRecord;
import com.github.loki4j.common.LogRecordBatch;
import com.github.loki4j.common.LokiResponse;
import com.github.loki4j.common.LokiThreadFactory;
import com.github.loki4j.common.SoftLimitBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.function.Supplier;

/* loaded from: input_file:com/github/loki4j/logback/DefaultPipeline.class */
public final class DefaultPipeline extends ContextAwareBase implements LifeCycle {
    private final SoftLimitBuffer<LogRecord> buffer;
    private final Batcher batcher;
    private ScheduledExecutorService scheduler;
    private ExecutorService encoderThreadPool;
    private ExecutorService senderThreadPool;
    private final Function<LogRecordBatch, BinaryBatch> encode;
    private final Function<BinaryBatch, LokiResponse> send;
    private final boolean drainOnStop;
    private ScheduledFuture<?> drainScheduledFuture;
    private final long PARK_NS = TimeUnit.MILLISECONDS.toNanos(1);
    private final ArrayBlockingQueue<BinaryBatch> senderQueue = new ArrayBlockingQueue<>(10);
    private volatile boolean started = false;
    private AtomicBoolean newEventsArrived = new AtomicBoolean(false);
    private AtomicBoolean drainRequested = new AtomicBoolean(false);
    private AtomicLong lastSendTimeMs = new AtomicLong(System.currentTimeMillis());
    private boolean traceEnabled = false;

    public DefaultPipeline(SoftLimitBuffer<LogRecord> softLimitBuffer, Batcher batcher, Function<LogRecordBatch, BinaryBatch> function, Function<BinaryBatch, LokiResponse> function2, boolean z) {
        this.buffer = softLimitBuffer;
        this.batcher = batcher;
        this.encode = function;
        this.send = function2;
        this.drainOnStop = z;
    }

    public void start() {
        addInfo("Pipeline is starting...");
        this.started = true;
        this.senderThreadPool = Executors.newFixedThreadPool(1, new LokiThreadFactory("loki-sender"));
        this.senderThreadPool.execute(() -> {
            runSendLoop();
        });
        this.encoderThreadPool = Executors.newFixedThreadPool(1, new LokiThreadFactory("loki-encoder"));
        this.encoderThreadPool.execute(() -> {
            runEncodeLoop();
        });
        this.scheduler = Executors.newScheduledThreadPool(1, new LokiThreadFactory("loki-scheduler"));
        this.drainScheduledFuture = this.scheduler.scheduleAtFixedRate(() -> {
            drain();
        }, 100L, 100L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.drainScheduledFuture.cancel(false);
        if (this.drainOnStop) {
            addInfo("Pipeline is draining...");
            waitSendQueueLessThan(this.batcher.getCapacity(), Long.MAX_VALUE);
            this.lastSendTimeMs.set(0L);
            drain();
            waitSendQueueIsEmpty(Long.MAX_VALUE);
            addInfo("Drain completed");
        }
        this.started = false;
        this.scheduler.shutdown();
        this.encoderThreadPool.shutdown();
        this.senderThreadPool.shutdown();
    }

    public boolean append(Supplier<LogRecord> supplier) {
        boolean offer = this.buffer.offer(supplier);
        if (offer) {
            this.newEventsArrived.set(true);
        }
        return offer;
    }

    private void drain() {
        this.drainRequested.set(true);
        trace("drain planned", new Object[0]);
    }

    private void runEncodeLoop() {
        LogRecordBatch logRecordBatch = new LogRecordBatch(this.batcher.getCapacity());
        while (this.started) {
            try {
                encodeStep(logRecordBatch);
            } catch (InterruptedException e) {
                stop();
            }
        }
    }

    private void runSendLoop() {
        while (this.started) {
            try {
                sendStep();
            } catch (InterruptedException e) {
                stop();
            }
        }
    }

    private void encodeStep(LogRecordBatch logRecordBatch) throws InterruptedException {
        while (this.started && (noEncodeActions() || this.senderQueue.remainingCapacity() == 0)) {
            LockSupport.parkNanos(this, this.PARK_NS);
        }
        if (!this.started) {
            return;
        }
        trace("check encode actions", new Object[0]);
        LogRecord poll = this.buffer.poll();
        while (poll != null && logRecordBatch.isEmpty()) {
            this.batcher.add(poll, logRecordBatch);
            if (logRecordBatch.isEmpty()) {
                poll = this.buffer.poll();
            }
        }
        if (logRecordBatch.isEmpty() && this.drainRequested.get()) {
            this.batcher.drain(this.lastSendTimeMs.get(), logRecordBatch);
        }
        this.newEventsArrived.set(false);
        this.drainRequested.set(false);
        if (logRecordBatch.isEmpty()) {
            return;
        }
        BinaryBatch apply = this.encode.apply(logRecordBatch);
        logRecordBatch.clear();
        boolean z = false;
        while (true) {
            boolean z2 = z;
            if (!this.started || z2) {
                return;
            } else {
                z = this.senderQueue.offer(apply, this.PARK_NS, TimeUnit.NANOSECONDS);
            }
        }
    }

    private boolean noEncodeActions() {
        return (this.newEventsArrived.get() || this.drainRequested.get()) ? false : true;
    }

    private void sendStep() throws InterruptedException {
        BinaryBatch binaryBatch = null;
        while (this.started && binaryBatch == null) {
            if (this.senderQueue.size() > 0) {
                binaryBatch = this.senderQueue.poll(this.PARK_NS, TimeUnit.NANOSECONDS);
            } else {
                LockSupport.parkNanos(this, this.PARK_NS);
            }
        }
        if (this.started) {
            this.send.apply(binaryBatch);
            this.buffer.commit(binaryBatch.recordsCount);
            this.lastSendTimeMs.set(System.currentTimeMillis());
            trace("sent items: %s", Integer.valueOf(binaryBatch.recordsCount));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitSendQueueIsEmpty(long j) {
        waitSendQueueLessThan(1, j);
    }

    void waitSendQueueLessThan(int i, long j) {
        long j2;
        long nanos = TimeUnit.MILLISECONDS.toNanos(j);
        long j3 = 0;
        while (true) {
            j2 = j3;
            if (!this.started || this.buffer.size() < i || j2 >= nanos) {
                break;
            }
            LockSupport.parkNanos(this.PARK_NS);
            j3 = j2 + this.PARK_NS;
        }
        Object[] objArr = new Object[5];
        objArr[0] = Boolean.valueOf(this.started);
        objArr[1] = Integer.valueOf(this.buffer.size());
        objArr[2] = Integer.valueOf(i);
        objArr[3] = Long.valueOf(j);
        objArr[4] = j2 < nanos ? "not" : "";
        trace("wait send queue: started=%s, buffer(%s)>=%s, %s ms %s elapsed", objArr);
        if (j2 >= nanos) {
            throw new RuntimeException("Not completed within timeout " + j + " ms");
        }
    }

    private void trace(String str, Object... objArr) {
        if (this.traceEnabled) {
            addInfo(String.format(str, objArr));
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public void setTraceEnabled(boolean z) {
        this.traceEnabled = z;
    }
}
