/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.AsyncStateStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.StateStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreClosedException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends StateStore>
implements AsyncStateStore {
    private static final Logger log = LoggerFactory.getLogger(AbstractStateStoreWithJournal.class);
    protected final LocalStateStoreT localStore;
    protected String name = "UNINITIALIZED";
    protected StateStoreSpec spec;
    protected boolean ownWriteScheduler = false;
    protected boolean ownReadScheduler = false;
    protected ScheduledExecutorService writeIOScheduler;
    protected ScheduledExecutorService readIOScheduler;
    private final Namespace logNamespace;
    private DistributedLogManager logManager;
    private AsyncLogWriter writer;
    private long nextRevision;
    private CommandProcessor<LocalStateStoreT> commandProcessor;
    private ScheduledFuture<?> checkpointTask;
    private Duration checkpointInterval;
    protected boolean isInitialized = false;
    protected CompletableFuture<Void> closeFuture = null;

    protected AbstractStateStoreWithJournal(Supplier<LocalStateStoreT> localStateStoreSupplier, Supplier<Namespace> namespaceSupplier) {
        this.localStore = (StateStore)localStateStoreSupplier.get();
        this.logNamespace = namespaceSupplier.get();
    }

    protected boolean ownWriteScheduler() {
        return this.ownWriteScheduler;
    }

    protected boolean ownReadScheduler() {
        return this.ownReadScheduler;
    }

    synchronized AsyncLogWriter getWriter() {
        return this.writer;
    }

    public void purgeOlderThan(long txId) throws IOException {
        this.logManager.purgeLogsOlderThan(txId);
    }

    public CompletableFuture<Boolean> truncateJournal(DLSN dlsn) {
        return this.getWriter().truncate(dlsn);
    }

    public DLSN getLastDLSN() throws IOException {
        return this.logManager.getLastDLSN();
    }

    private void validateStoreSpec(StateStoreSpec spec) {
        Preconditions.checkNotNull(spec.getStream(), "No log stream is specified for state store %s", (Object)spec.getName());
    }

    private synchronized void markInitialized(AsyncLogReader reader) {
        this.isInitialized = true;
        if (null != this.checkpointInterval) {
            long checkpointIntervalMs = this.checkpointInterval.toMillis();
            this.checkpointTask = this.writeIOScheduler.scheduleAtFixedRate(() -> this.localStore.checkpoint(), checkpointIntervalMs, checkpointIntervalMs, TimeUnit.MILLISECONDS);
        }
        if (this.spec.isReadonly()) {
            this.replayLoop(reader);
        } else {
            reader.asyncClose();
        }
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public StateStoreSpec spec() {
        return this.spec;
    }

    @Override
    public CompletableFuture<Void> init(StateStoreSpec spec) {
        ThreadFactory threadFactory;
        try {
            this.validateStoreSpec(spec);
        }
        catch (IllegalArgumentException e) {
            log.error("Fail to init state store due to : ", (Throwable)e);
            return FutureUtils.exception(e);
        }
        this.spec = spec;
        this.name = spec.getName();
        if (null != spec.getWriteIOScheduler()) {
            this.writeIOScheduler = spec.getWriteIOScheduler();
            this.ownWriteScheduler = false;
        } else {
            threadFactory = new ThreadFactoryBuilder().setNameFormat("statestore-" + spec.getName() + "-write-io-scheduler-%d").build();
            this.writeIOScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
            this.ownWriteScheduler = true;
        }
        if (null != spec.getReadIOScheduler()) {
            this.readIOScheduler = spec.getReadIOScheduler();
        } else if (this.ownWriteScheduler) {
            this.readIOScheduler = this.writeIOScheduler;
            this.ownReadScheduler = false;
        } else {
            threadFactory = new ThreadFactoryBuilder().setNameFormat("statestore-" + spec.getName() + "-read-io-scheduler-%d").build();
            this.readIOScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
            this.ownReadScheduler = true;
        }
        this.checkpointInterval = null != spec.getCheckpointStore() ? spec.getCheckpointDuration() : null;
        if (spec.isReadonly()) {
            return ((CompletableFuture)this.initializeLocalStore(spec).thenComposeAsync(ignored -> this.getLastDLSN(spec), (Executor)this.writeIOScheduler)).thenComposeAsync(endDLSN -> this.replayJournal((DLSN)endDLSN), (Executor)this.writeIOScheduler);
        }
        return ((CompletableFuture)this.initializeLocalStore(spec).thenComposeAsync(ignored -> this.initializeJournalWriter(spec), (Executor)this.writeIOScheduler)).thenComposeAsync(endDLSN -> {
            log.info("Successfully write a barrier record for mvcc store {} at {}", (Object)this.name, endDLSN);
            return this.replayJournal((DLSN)endDLSN);
        }, (Executor)this.writeIOScheduler);
    }

    private CompletableFuture<Void> initializeLocalStore(StateStoreSpec spec) {
        return this.executeWriteIO(() -> {
            log.info("Initializing the local state for mvcc store {}", (Object)this.name());
            this.localStore.init(spec);
            log.info("Initialized the local state for mvcc store {}", (Object)this.name());
            this.commandProcessor = this.newCommandProcessor();
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<DLSN> initializeJournalWriter(StateStoreSpec spec) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                return FutureUtils.exception(new StateStoreClosedException(this.name()));
            }
        }
        try {
            this.logManager = this.logNamespace.openLog(spec.getStream());
        }
        catch (IOException e) {
            return FutureUtils.exception(e);
        }
        LedgerMetadata metadata = new LedgerMetadata();
        metadata.setApplication("bk-stream-storage-service");
        metadata.setComponent("state-store");
        return this.logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
            AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
            synchronized (abstractStateStoreWithJournal) {
                this.writer = w;
                this.nextRevision = this.writer.getLastTxId();
                if (this.nextRevision < 0L) {
                    this.nextRevision = 0L;
                }
                log.info("Initialized the journal writer for mvcc store {} : last revision = {}", (Object)this.name(), (Object)this.nextRevision);
            }
            return this.writeCommandBuf(this.newCatchupMarker());
        }, (Executor)this.writeIOScheduler);
    }

    private CompletableFuture<DLSN> writeCatchUpMarker() {
        LedgerMetadata metadata = new LedgerMetadata();
        metadata.setApplication("bk-stream-storage-service");
        metadata.setComponent("state-store");
        return ((CompletableFuture)this.logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
            AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
            synchronized (abstractStateStoreWithJournal) {
                this.writer = w;
                this.nextRevision = this.writer.getLastTxId();
                if (this.nextRevision < 0L) {
                    this.nextRevision = 0L;
                }
                log.info("Initialized the journal writer for writing catchup marker to mvcc store {} : last revision = {}", (Object)this.name(), (Object)this.nextRevision);
            }
            return this.writeCommandBuf(this.newCatchupMarker());
        })).thenCompose(dlsn -> {
            AsyncLogWriter w;
            AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
            synchronized (abstractStateStoreWithJournal) {
                w = this.writer;
            }
            if (null == w) {
                return FutureUtils.value(dlsn);
            }
            return w.asyncClose().thenApply(ignored -> dlsn);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<DLSN> getLastDLSN(StateStoreSpec spec) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                return FutureUtils.exception(new StateStoreClosedException(this.name()));
            }
        }
        try {
            this.logManager = this.logNamespace.openLog(spec.getStream());
        }
        catch (IOException e) {
            return FutureUtils.exception(e);
        }
        final CompletableFuture<DLSN> future = FutureUtils.createFuture();
        this.logManager.getLastDLSNAsync().whenCompleteAsync(new FutureEventListener<DLSN>(){

            @Override
            public void onSuccess(DLSN dlsn) {
                future.complete(dlsn);
            }

            @Override
            public void onFailure(Throwable throwable) {
                if (throwable instanceof LogEmptyException || throwable instanceof LogNotFoundException) {
                    FutureUtils.proxyTo(AbstractStateStoreWithJournal.this.writeCatchUpMarker(), future);
                } else {
                    future.completeExceptionally(throwable);
                }
            }
        });
        return future;
    }

    private CompletableFuture<Void> replayJournal(DLSN endDLSN) {
        long lastRevision = this.localStore.getLastRevision();
        return this.logManager.openAsyncLogReader(lastRevision).thenComposeAsync(r -> {
            CompletableFuture<Void> replayFuture = FutureUtils.createFuture();
            replayFuture.exceptionally(cause -> {
                r.asyncClose();
                return null;
            });
            log.info("Successfully open the journal reader for mvcc store {} : end dlsn = {}", (Object)this.name(), (Object)endDLSN);
            this.replayJournal((AsyncLogReader)r, endDLSN, replayFuture, lastRevision);
            return replayFuture;
        }, (Executor)this.writeIOScheduler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void replayJournal(AsyncLogReader reader, DLSN endDLSN, CompletableFuture<Void> future, long startingTxId) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                FutureUtils.completeExceptionally(future, new StateStoreClosedException(this.name()));
                return;
            }
        }
        ((CompletableFuture)reader.readNext().whenComplete((record, exc) -> {
            if (exc != null) {
                FutureUtils.completeExceptionally(future, exc.getCause());
                return;
            }
            if (startingTxId != -1L && startingTxId != record.getTransactionId()) {
                String msg = String.format("replayJournal failed: Invalid starting transaction %d expecting %d for stream %s", record.getTransactionId(), startingTxId, this.name);
                log.error(msg);
                FutureUtils.completeExceptionally(future, new InvalidStateStoreException(msg));
                return;
            }
        })).whenComplete(this.newRecordHandler(reader, endDLSN, future));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void replayJournal(AsyncLogReader reader, DLSN endDLSN, CompletableFuture<Void> future) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                FutureUtils.completeExceptionally(future, new StateStoreClosedException(this.name()));
                return;
            }
        }
        reader.readNext().whenComplete(this.newRecordHandler(reader, endDLSN, future));
    }

    private FutureEventListener<LogRecordWithDLSN> newRecordHandler(final AsyncLogReader reader, final DLSN endDLSN, final CompletableFuture<Void> future) {
        return new FutureEventListener<LogRecordWithDLSN>(){

            @Override
            public void onSuccess(LogRecordWithDLSN record) {
                if (log.isDebugEnabled()) {
                    log.debug("Received command record {} @ {} to replay at mvcc store {}", new Object[]{record, record.getDlsn(), AbstractStateStoreWithJournal.this.name()});
                }
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Applying command transaction {} - record {} @ {} to mvcc store {}", new Object[]{record.getTransactionId(), record, record.getDlsn(), AbstractStateStoreWithJournal.this.name()});
                    }
                    AbstractStateStoreWithJournal.this.commandProcessor.applyCommand(record.getTransactionId(), record.getPayloadBuf(), AbstractStateStoreWithJournal.this.localStore);
                    if (record.getDlsn().compareTo(endDLSN) >= 0) {
                        log.info("Finished replaying journal for state store {}", (Object)AbstractStateStoreWithJournal.this.name());
                        AbstractStateStoreWithJournal.this.markInitialized(reader);
                        FutureUtils.complete(future, null);
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Read next record after {} at mvcc store {}", (Object)record.getDlsn(), (Object)AbstractStateStoreWithJournal.this.name());
                    }
                    AbstractStateStoreWithJournal.this.replayJournal(reader, endDLSN, future);
                }
                catch (Exception e) {
                    log.error("Exception is thrown when applying command record {} @ {} to mvcc store {}", new Object[]{record, record.getDlsn(), AbstractStateStoreWithJournal.this.name()});
                    FutureUtils.completeExceptionally(future, e);
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(future, cause);
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void replayLoop(final AsyncLogReader reader) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                reader.asyncClose();
                return;
            }
        }
        reader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>(){

            @Override
            public void onSuccess(LogRecordWithDLSN record) {
                if (log.isDebugEnabled()) {
                    log.debug("Received command record {} @ {} to replay at mvcc store {}", new Object[]{record, record.getDlsn(), AbstractStateStoreWithJournal.this.name()});
                }
                try {
                    AbstractStateStoreWithJournal.this.commandProcessor.applyCommand(record.getTransactionId(), record.getPayloadBuf(), AbstractStateStoreWithJournal.this.localStore);
                    AbstractStateStoreWithJournal.this.replayLoop(reader);
                }
                catch (StateStoreRuntimeException e) {
                    log.error("Fail to reply command record {}", (Object)record, (Object)e);
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future;
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                return this.closeFuture;
            }
            future = FutureUtils.createFuture();
            this.closeFuture = future;
        }
        if (null != this.checkpointTask && !this.checkpointTask.cancel(true)) {
            log.warn("Fail to cancel checkpoint task of state store {}", (Object)this.name());
        }
        this.writeIOScheduler.submit(() -> {
            log.info("closing async state store {}", (Object)this.name);
            FutureUtils.ensure(Utils.closeSequence((ExecutorService)this.readIOScheduler, (boolean)true, (AsyncCloseable[])new AsyncCloseable[]{this.getWriter(), this.logManager}).thenRun(() -> log.info("Successfully close the log stream of state store {}", (Object)this.name)), () -> {
                if (null == this.readIOScheduler) {
                    this.closeLocalStore();
                    FutureUtils.complete(future, null);
                    return;
                }
                this.readIOScheduler.submit(() -> {
                    this.closeLocalStore();
                    if (this.ownReadScheduler) {
                        this.readIOScheduler.shutdown();
                    }
                    if (this.ownWriteScheduler) {
                        this.writeIOScheduler.shutdown();
                    }
                    FutureUtils.complete(future, null);
                });
            });
        });
        return future;
    }

    private void closeLocalStore() {
        if (null == this.localStore) {
            return;
        }
        try {
            this.localStore.flush();
        }
        catch (StateStoreException e) {
            log.warn("Fail to flush local state store {}", (Object)this.name(), (Object)e);
        }
        this.localStore.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> CompletableFuture<T> executeIO(ScheduledExecutorService scheduler, Callable<T> callable) {
        AbstractStateStoreWithJournal abstractStateStoreWithJournal = this;
        synchronized (abstractStateStoreWithJournal) {
            if (null != this.closeFuture) {
                return FutureUtils.exception(new StateStoreClosedException(this.name()));
            }
        }
        CompletableFuture future = FutureUtils.createFuture();
        scheduler.submit(() -> {
            try {
                Object value = callable.call();
                future.complete(value);
            }
            catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    protected <T> CompletableFuture<T> executeWriteIO(Callable<T> callable) {
        return this.executeIO(this.writeIOScheduler, callable);
    }

    protected <T> CompletableFuture<T> executeReadIO(Callable<T> callable) {
        return this.executeIO(this.readIOScheduler, callable);
    }

    protected synchronized CompletableFuture<DLSN> writeCommandBuf(ByteBuf cmdBuf) {
        long txId = ++this.nextRevision;
        return FutureUtils.ensure(this.writer.write(new LogRecord(txId, cmdBuf.nioBuffer())), () -> ReferenceCountUtil.safeRelease(cmdBuf));
    }

    protected synchronized CompletableFuture<Long> writeCommandBufReturnTxId(ByteBuf cmdBuf) {
        long txId = ++this.nextRevision;
        return FutureUtils.ensure(this.writer.write(new LogRecord(txId, cmdBuf.nioBuffer())).thenApply(dlsn -> txId), () -> ReferenceCountUtil.safeRelease(cmdBuf));
    }

    protected abstract ByteBuf newCatchupMarker();

    protected abstract CommandProcessor<LocalStateStoreT> newCommandProcessor();

    public LocalStateStoreT getLocalStore() {
        return this.localStore;
    }
}

