package io.nats.client.impl;

import io.nats.client.Consumer;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/jnats-2.16.1.jar:io/nats/client/impl/NatsConsumer.class */
public abstract class NatsConsumer implements Consumer {
    NatsConnection connection;
    private AtomicLong maxMessages = new AtomicLong(Consumer.DEFAULT_MAX_MESSAGES);
    private AtomicLong maxBytes = new AtomicLong(Consumer.DEFAULT_MAX_BYTES);
    private AtomicLong droppedMessages = new AtomicLong();
    private AtomicLong messagesDelivered = new AtomicLong(0);
    private AtomicBoolean slow = new AtomicBoolean(false);
    private AtomicReference<CompletableFuture<Boolean>> drainingFuture = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public NatsConsumer(NatsConnection natsConnection) {
        this.connection = natsConnection;
    }

    @Override // io.nats.client.Consumer
    public void setPendingLimits(long j, long j2) {
        this.maxMessages.set(j);
        this.maxBytes.set(j2);
    }

    @Override // io.nats.client.Consumer
    public long getPendingMessageLimit() {
        return this.maxMessages.get();
    }

    @Override // io.nats.client.Consumer
    public long getPendingByteLimit() {
        return this.maxBytes.get();
    }

    @Override // io.nats.client.Consumer
    public long getPendingMessageCount() {
        if (getMessageQueue() != null) {
            return getMessageQueue().length();
        }
        return 0L;
    }

    @Override // io.nats.client.Consumer
    public long getPendingByteCount() {
        if (getMessageQueue() != null) {
            return getMessageQueue().sizeInBytes();
        }
        return 0L;
    }

    @Override // io.nats.client.Consumer
    public long getDeliveredCount() {
        return this.messagesDelivered.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void incrementDeliveredCount() {
        this.messagesDelivered.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void incrementDroppedCount() {
        this.droppedMessages.incrementAndGet();
    }

    @Override // io.nats.client.Consumer
    public long getDroppedCount() {
        return this.droppedMessages.get();
    }

    @Override // io.nats.client.Consumer
    public void clearDroppedCount() {
        this.droppedMessages.set(0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markSlow() {
        this.slow.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markNotSlow() {
        this.slow.set(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isMarkedSlow() {
        return this.slow.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasReachedPendingLimits() {
        return (getPendingByteCount() >= getPendingByteLimit() && getPendingByteLimit() > 0) || (getPendingMessageCount() >= getPendingMessageLimit() && getPendingMessageLimit() > 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markDraining(CompletableFuture<Boolean> completableFuture) {
        this.drainingFuture.set(completableFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markUnsubedForDrain() {
        if (getMessageQueue() != null) {
            getMessageQueue().drain();
        }
    }

    CompletableFuture<Boolean> getDrainingFuture() {
        return this.drainingFuture.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDraining() {
        return this.drainingFuture.get() != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDrained() {
        return isDraining() && getPendingMessageCount() == 0;
    }

    @Override // io.nats.client.Consumer
    public CompletableFuture<Boolean> drain(Duration duration) throws InterruptedException {
        if (!isActive() || this.connection == null) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (isDraining()) {
            return getDrainingFuture();
        }
        Instant now = Instant.now();
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        markDraining(completableFuture);
        sendUnsubForDrain();
        try {
            this.connection.flush(duration);
        } catch (TimeoutException e) {
            this.connection.processException(e);
        }
        markUnsubedForDrain();
        this.connection.getExecutor().submit(() -> {
            try {
                try {
                    Instant now2 = Instant.now();
                    while (true) {
                        if ((duration != null && !duration.equals(Duration.ZERO) && Duration.between(now, now2).compareTo(duration) >= 0) || isDrained()) {
                            break;
                        }
                        Thread.sleep(1L);
                        now2 = Instant.now();
                    }
                    cleanUpAfterDrain();
                    completableFuture.complete(Boolean.valueOf(isDrained()));
                } catch (InterruptedException e2) {
                    this.connection.processException(e2);
                    completableFuture.complete(Boolean.valueOf(isDrained()));
                }
            } catch (Throwable th) {
                completableFuture.complete(Boolean.valueOf(isDrained()));
                throw th;
            }
        });
        return getDrainingFuture();
    }

    @Override // io.nats.client.Consumer
    public abstract boolean isActive();

    abstract MessageQueue getMessageQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract void sendUnsubForDrain();

    abstract void cleanUpAfterDrain();
}
