package com.twitter.distributedlog.messaging;

import com.google.common.base.Charsets;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
import java.io.File;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.impl.Iq80DBFactory;

/* loaded from: input_file:com/twitter/distributedlog/messaging/ReaderWithOffsets.class */
public class ReaderWithOffsets {
    private static final String HELP = "ReaderWithOffsets <uri> <string> <reader-id> <offset-store-file>";

    public static void main(String[] strArr) throws Exception {
        if (4 != strArr.length) {
            System.out.println(HELP);
            return;
        }
        String str = strArr[0];
        String str2 = strArr[1];
        final String str3 = strArr[2];
        String str4 = strArr[3];
        DistributedLogNamespace build = DistributedLogNamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration()).uri(URI.create(str)).build();
        System.out.println("Opening log stream " + str2);
        DistributedLogManager openLog = build.openLog(str2);
        Options options = new Options();
        options.createIfMissing(true);
        final DB open = Iq80DBFactory.factory.open(new File(str4), options);
        final AtomicReference atomicReference = new AtomicReference(null);
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { // from class: com.twitter.distributedlog.messaging.ReaderWithOffsets.1
            @Override // java.lang.Runnable
            public void run() {
                if (null != atomicReference.get()) {
                    open.put(str3.getBytes(Charsets.UTF_8), ((DLSN) atomicReference.get()).serializeBytes());
                    System.out.println("Updated reader " + str3 + " offset to " + atomicReference.get());
                }
            }
        }, 10L, 10L, TimeUnit.SECONDS);
        try {
            byte[] bArr = open.get(str3.getBytes(Charsets.UTF_8));
            readLoop(openLog, null == bArr ? DLSN.InitialDLSN : DLSN.deserializeBytes(bArr), atomicReference);
            open.close();
            openLog.close();
            build.close();
        } catch (Throwable th) {
            open.close();
            openLog.close();
            build.close();
            throw th;
        }
    }

    private static void readLoop(final DistributedLogManager distributedLogManager, DLSN dlsn, final AtomicReference<DLSN> atomicReference) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        System.out.println("Wait for records starting from " + dlsn);
        final AsyncLogReader asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(dlsn));
        asyncLogReader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { // from class: com.twitter.distributedlog.messaging.ReaderWithOffsets.2
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + distributedLogManager.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                System.out.println("Received record " + logRecordWithDLSN.getDlsn());
                System.out.println("\"\"\"");
                System.out.println(new String(logRecordWithDLSN.getPayload(), Charsets.UTF_8));
                System.out.println("\"\"\"");
                atomicReference.set(logRecordWithDLSN.getDlsn());
                asyncLogReader.readNext().addEventListener(this);
            }
        });
        countDownLatch.await();
        FutureUtils.result(asyncLogReader.asyncClose(), Duration.apply(5L, TimeUnit.SECONDS));
    }
}
