package com.twitter.distributedlog.messaging;

import com.google.common.base.Charsets;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecord;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.thrift.messaging.TransformedRecord;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;

/* loaded from: input_file:com/twitter/distributedlog/messaging/StreamTransformer.class */
public class StreamTransformer {
    private static final String HELP = "StreamTransformer <uri> <src_stream> <target_stream>";
    private static final TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();

    public static void main(String[] strArr) throws Exception {
        DLSN dlsn;
        if (3 != strArr.length) {
            System.out.println(HELP);
            return;
        }
        String str = strArr[0];
        String str2 = strArr[1];
        String str3 = strArr[2];
        URI create = URI.create(str);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setOutputBufferSize(16384);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(5);
        DistributedLogNamespace build = DistributedLogNamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(create).build();
        System.out.println("Opening log stream " + str2);
        DistributedLogManager openLog = build.openLog(str2);
        System.out.println("Opening log stream " + str3);
        DistributedLogManager openLog2 = build.openLog(str3);
        IdenticalTransformer identicalTransformer = new IdenticalTransformer();
        try {
            LogRecordWithDLSN lastLogRecord = openLog2.getLastLogRecord();
            TransformedRecord transformedRecord = new TransformedRecord();
            try {
                transformedRecord.read(protocolFactory.getProtocol(new TIOStreamTransport(new ByteArrayInputStream(lastLogRecord.getPayload()))));
                dlsn = DLSN.deserializeBytes(transformedRecord.getSrcDlsn());
                System.out.println("Last transformed record is " + dlsn);
            } catch (TException e) {
                System.err.println("Error on reading last transformed record");
                e.printStackTrace(System.err);
                dlsn = DLSN.InitialDLSN;
            }
        } catch (LogEmptyException e2) {
            dlsn = DLSN.InitialDLSN;
        } catch (LogNotFoundException e3) {
            dlsn = DLSN.InitialDLSN;
        }
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) FutureUtils.result(openLog2.openAsyncLogWriter());
        try {
            readLoop(openLog, dlsn, asyncLogWriter, identicalTransformer);
            FutureUtils.result(asyncLogWriter.asyncClose(), Duration.apply(5L, TimeUnit.SECONDS));
            openLog2.close();
            openLog.close();
            build.close();
        } catch (Throwable th) {
            FutureUtils.result(asyncLogWriter.asyncClose(), Duration.apply(5L, TimeUnit.SECONDS));
            openLog2.close();
            openLog.close();
            build.close();
            throw th;
        }
    }

    private static void readLoop(final DistributedLogManager distributedLogManager, final DLSN dlsn, final AsyncLogWriter asyncLogWriter, final Transformer<byte[], byte[]> transformer) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        System.out.println("Wait for records starting from " + dlsn);
        final AsyncLogReader asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(dlsn));
        asyncLogReader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { // from class: com.twitter.distributedlog.messaging.StreamTransformer.1
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + distributedLogManager.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                if (logRecordWithDLSN.getDlsn().compareTo(dlsn) <= 0) {
                    asyncLogReader.readNext().addEventListener(this);
                    return;
                }
                System.out.println("Received record " + logRecordWithDLSN.getDlsn());
                System.out.println("\"\"\"");
                System.out.println(new String(logRecordWithDLSN.getPayload(), Charsets.UTF_8));
                System.out.println("\"\"\"");
                try {
                    StreamTransformer.transform(asyncLogWriter, logRecordWithDLSN, transformer, countDownLatch);
                } catch (Exception e) {
                    System.err.println("Encountered error on transforming record " + logRecordWithDLSN.getDlsn() + " from stream " + distributedLogManager.getStreamName());
                    e.printStackTrace(System.err);
                    countDownLatch.countDown();
                }
                asyncLogReader.readNext().addEventListener(this);
            }
        });
        countDownLatch.await();
        FutureUtils.result(asyncLogReader.asyncClose(), Duration.apply(5L, TimeUnit.SECONDS));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void transform(final AsyncLogWriter asyncLogWriter, LogRecordWithDLSN logRecordWithDLSN, Transformer<byte[], byte[]> transformer, final CountDownLatch countDownLatch) throws Exception {
        DLSN dlsn = logRecordWithDLSN.getDlsn();
        TransformedRecord transformedRecord = new TransformedRecord(ByteBuffer.wrap(transformer.transform(logRecordWithDLSN.getPayload())));
        transformedRecord.setSrcDlsn(dlsn.serializeBytes());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(4096);
        transformedRecord.write(protocolFactory.getProtocol(new TIOStreamTransport(byteArrayOutputStream)));
        asyncLogWriter.write(new LogRecord(logRecordWithDLSN.getSequenceId(), byteArrayOutputStream.toByteArray())).addEventListener(new FutureEventListener<DLSN>() { // from class: com.twitter.distributedlog.messaging.StreamTransformer.2
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on writing records to stream " + asyncLogWriter.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(DLSN dlsn2) {
                System.out.println("Write transformed record " + dlsn2);
            }
        });
    }
}
