/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.distributedlog.BKAsyncLogReader;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReader
implements FutureEventListener<LogRecordWithDLSN> {
    static final Logger LOG = LoggerFactory.getLogger(TestReader.class);
    final String readerName;
    final DistributedLogManager dlm;
    AsyncLogReader reader;
    final DLSN startDLSN;
    DLSN nextDLSN;
    final boolean simulateErrors;
    int delayMs;
    final ScheduledExecutorService executorService;
    final CountDownLatch readyLatch;
    final CountDownLatch completionLatch;
    final CountDownLatch countLatch;
    final AtomicBoolean errorsFound;
    final AtomicInteger readCount;
    final AtomicInteger positionReaderCount;

    public TestReader(String name, DistributedLogManager dlm, DLSN startDLSN, boolean simulateErrors, int delayMs, CountDownLatch readyLatch, CountDownLatch countLatch, CountDownLatch completionLatch) {
        this.readerName = name;
        this.dlm = dlm;
        this.startDLSN = startDLSN;
        this.simulateErrors = simulateErrors;
        this.delayMs = delayMs;
        this.readyLatch = readyLatch;
        this.countLatch = countLatch;
        this.completionLatch = completionLatch;
        this.errorsFound = new AtomicBoolean(false);
        this.readCount = new AtomicInteger(0);
        this.positionReaderCount = new AtomicInteger(0);
        this.executorService = Executors.newSingleThreadScheduledExecutor();
    }

    public AtomicInteger getNumReaderPositions() {
        return this.positionReaderCount;
    }

    public AtomicInteger getNumReads() {
        return this.readCount;
    }

    public boolean areErrorsFound() {
        return this.errorsFound.get();
    }

    private int nextDelayMs() {
        int newDelayMs = Math.min(this.delayMs * 2, 500);
        if (0 == this.delayMs) {
            newDelayMs = 10;
        }
        this.delayMs = newDelayMs;
        return this.delayMs;
    }

    private void positionReader(final DLSN dlsn) {
        this.positionReaderCount.incrementAndGet();
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    AsyncLogReader reader = TestReader.this.dlm.getAsyncLogReader(dlsn);
                    if (TestReader.this.simulateErrors) {
                        ((BKAsyncLogReader)reader).simulateErrors();
                    }
                    TestReader.this.nextDLSN = dlsn;
                    LOG.info("Positioned reader {} at {}", (Object)TestReader.this.readerName, (Object)dlsn);
                    if (null != TestReader.this.reader) {
                        Utils.close(TestReader.this.reader);
                    }
                    TestReader.this.reader = reader;
                    TestReader.this.readNext();
                    TestReader.this.readyLatch.countDown();
                }
                catch (IOException exc) {
                    int nextMs = TestReader.this.nextDelayMs();
                    LOG.info("Encountered exception {} on opening reader {} at {}, retrying in {} ms", new Object[]{exc, TestReader.this.readerName, dlsn, nextMs});
                    TestReader.this.positionReader(dlsn);
                }
            }
        };
        this.executorService.schedule(runnable, (long)this.delayMs, TimeUnit.MILLISECONDS);
    }

    private void readNext() {
        CompletableFuture<LogRecordWithDLSN> record = this.reader.readNext();
        record.whenComplete((BiConsumer)this);
    }

    @Override
    public void onSuccess(LogRecordWithDLSN value) {
        try {
            Assert.assertTrue((value.getDlsn().compareTo(this.nextDLSN) >= 0 ? 1 : 0) != 0);
            LOG.info("Received record {} from log {} for reader {}", new Object[]{value.getDlsn(), this.dlm.getStreamName(), this.readerName});
            Assert.assertFalse((boolean)value.isControl());
            Assert.assertEquals((long)0L, (long)value.getDlsn().getSlotId());
            DLMTestUtil.verifyLargeLogRecord(value);
        }
        catch (Exception exc) {
            LOG.error("Exception encountered when verifying received log record {} for reader {} :", new Object[]{value.getDlsn(), this.readerName, exc});
            this.errorsFound.set(true);
            this.completionLatch.countDown();
            return;
        }
        this.readCount.incrementAndGet();
        this.countLatch.countDown();
        if (this.countLatch.getCount() <= 0L) {
            LOG.info("Reader {} is completed", (Object)this.readerName);
            this.closeReader();
            this.completionLatch.countDown();
        } else {
            LOG.info("Reader {} : read count becomes {}, latch = {}", new Object[]{this.readerName, this.readCount.get(), this.countLatch.getCount()});
            this.nextDLSN = value.getDlsn().getNextDLSN();
            this.readNext();
        }
    }

    @Override
    public void onFailure(Throwable cause) {
        LOG.error("{} encountered exception on reading next record : ", (Object)this.readerName, (Object)cause);
        this.closeReader();
        this.nextDelayMs();
        this.positionReader(this.nextDLSN);
    }

    private void closeReader() {
        if (null != this.reader) {
            this.reader.asyncClose().whenComplete((value, cause) -> LOG.warn("Exception on closing reader {} : ", (Object)this.readerName, cause));
        }
    }

    public void start() {
        this.positionReader(this.startDLSN);
    }

    public void stop() {
        this.closeReader();
        this.executorService.shutdown();
    }
}

