/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.impl;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import tech.ydb.core.Status;

public abstract class GrpcStreamRetrier {
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    protected final String id;
    protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    protected final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduler;
    protected final AtomicInteger reconnectCounter = new AtomicInteger(0);

    protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
        this.id = UUID.randomUUID().toString();
    }

    protected abstract Logger getLogger();

    protected abstract String getStreamName();

    protected abstract void onStreamReconnect();

    protected abstract void onShutdown(String var1);

    private void tryScheduleReconnect() {
        int currentReconnectCounter = this.reconnectCounter.get() + 1;
        if (this.isReconnecting.compareAndSet(false, true)) {
            this.reconnectCounter.set(currentReconnectCounter);
            int delayMs = currentReconnectCounter <= 7 ? 256 * (1 << currentReconnectCounter) : 40000;
            delayMs += ThreadLocalRandom.current().nextInt(delayMs);
            this.getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", new Object[]{this.id, currentReconnectCounter, this.getStreamName(), delayMs});
            try {
                this.scheduler.schedule(this::reconnect, (long)delayMs, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException exception) {
                String errorMessage = "[" + this.id + "] Couldn't schedule reconnect: scheduler is already shut down. Shutting down " + this.getStreamName();
                this.getLogger().error(errorMessage);
                this.shutdownImpl(errorMessage);
            }
        } else {
            this.getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", (Object)this.id, (Object)this.getStreamName());
        }
    }

    void reconnect() {
        this.getLogger().info("[{}] {} reconnect #{} started", new Object[]{this.id, this.getStreamName(), this.reconnectCounter.get()});
        if (!this.isReconnecting.compareAndSet(true, false)) {
            this.getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen");
        }
        this.onStreamReconnect();
    }

    protected CompletableFuture<Void> shutdownImpl() {
        return this.shutdownImpl("");
    }

    protected CompletableFuture<Void> shutdownImpl(String reason) {
        this.getLogger().info("[{}] Shutting down {}" + (reason == null || reason.isEmpty() ? "" : " with reason: " + reason), (Object)this.id, (Object)this.getStreamName());
        this.isStopped.set(true);
        return CompletableFuture.runAsync(() -> this.onShutdown(reason));
    }

    protected void onSessionClosed(Status status, Throwable th) {
        this.getLogger().info("[{}] onSessionClosed called", (Object)this.id);
        if (th != null) {
            this.getLogger().error("[{}] Exception in {} stream session: ", new Object[]{this.id, this.getStreamName(), th});
        } else if (status.isSuccess()) {
            if (this.isStopped.get()) {
                this.getLogger().info("[{}] {} stream session closed successfully", (Object)this.id, (Object)this.getStreamName());
                return;
            }
            this.getLogger().warn("[{}] {} stream session was closed on working {}", new Object[]{this.id, this.getStreamName(), this.getStreamName()});
        } else {
            this.getLogger().warn("[{}] Error in {} stream session: {}", new Object[]{this.id, this.getStreamName(), status});
        }
        if (!this.isStopped.get()) {
            this.tryScheduleReconnect();
        } else {
            this.getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", (Object)this.id, (Object)this.getStreamName());
        }
    }
}

