/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.impl;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.topic.impl.Session;

public abstract class SessionBase<R, W>
implements Session {
    protected final GrpcReadWriteStream<R, W> streamConnection;
    protected final AtomicBoolean isWorking = new AtomicBoolean(true);
    private String token;

    public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
        this.streamConnection = streamConnection;
        this.token = streamConnection.authToken();
    }

    protected abstract Logger getLogger();

    protected abstract void sendUpdateTokenRequest(String var1);

    protected abstract void onStop();

    protected synchronized CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
        this.getLogger().info("Session start");
        return this.streamConnection.start(message -> {
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().trace("Message received:\n{}", message);
            } else {
                this.getLogger().debug("Message received");
            }
            if (this.isWorking.get()) {
                streamObserver.onNext(message);
            }
        });
    }

    public synchronized void send(W request) {
        if (!this.isWorking.get()) {
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
            }
            return;
        }
        String currentToken = this.streamConnection.authToken();
        if (!Objects.equals(this.token, currentToken)) {
            this.token = currentToken;
            this.getLogger().info("Sending new token");
            this.sendUpdateTokenRequest(this.token);
        }
        if (this.getLogger().isTraceEnabled()) {
            this.getLogger().trace("Sending request:\n{}", request);
        } else {
            this.getLogger().debug("Sending request");
        }
        this.streamConnection.sendNext(request);
    }

    private boolean stop() {
        this.getLogger().info("Session stop");
        return this.isWorking.compareAndSet(true, false);
    }

    @Override
    public synchronized boolean shutdown() {
        this.getLogger().info("Session shutdown");
        if (this.stop()) {
            this.onStop();
            this.streamConnection.close();
            return true;
        }
        return false;
    }
}

