package org.apache.distributedlog.mapreduce;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAccessor;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogManager;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/* loaded from: input_file:org/apache/distributedlog/mapreduce/DistributedLogInputFormat.class */
public class DistributedLogInputFormat extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable {
    private static final String DL_URI = "distributedlog.uri";
    private static final String DL_STREAM = "distributedlog.stream";
    protected Configuration conf;
    protected DistributedLogConfiguration dlConf;
    protected URI dlUri;
    protected DistributedLogNamespace namespace;
    protected String streamName;
    protected DistributedLogManager dlm;

    public void setConf(Configuration configuration) {
        this.conf = configuration;
        this.dlConf = new DistributedLogConfiguration();
        this.dlUri = URI.create(configuration.get(DL_URI, ""));
        this.streamName = configuration.get(DL_STREAM, "");
        try {
            this.namespace = DistributedLogNamespaceBuilder.newBuilder().conf(this.dlConf).uri(this.dlUri).build();
            this.dlm = this.namespace.openLog(this.streamName);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Configuration getConf() {
        return this.conf;
    }

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        List<LogSegmentMetadata> logSegments = this.dlm.getLogSegments();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(logSegments.size());
        LedgerManager ledgerManager = BookKeeperAccessor.getLedgerManager(this.namespace.getNamespaceDriver().getReaderBKC().get());
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference(null);
        for (LogSegmentMetadata logSegmentMetadata : logSegments) {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ledgerManager.readLedgerMetadata(logSegmentMetadata.getLogSegmentId(), new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { // from class: org.apache.distributedlog.mapreduce.DistributedLogInputFormat.1
                public void operationComplete(int i, LedgerMetadata ledgerMetadata) {
                    atomicReference.set(ledgerMetadata);
                    atomicInteger.set(i);
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            if (0 != atomicInteger.get()) {
                throw new IOException("Faild to get log segment metadata for " + logSegmentMetadata + " : " + BKException.getMessage(atomicInteger.get()));
            }
            newArrayListWithCapacity.add(new LogSegmentSplit(logSegmentMetadata, (LedgerMetadata) atomicReference.get()));
        }
        return newArrayListWithCapacity;
    }

    public RecordReader<DLSN, LogRecordWithDLSN> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new LogSegmentReader(this.streamName, this.dlConf, this.namespace.getNamespaceDriver().getReaderBKC().get(), (LogSegmentSplit) inputSplit);
    }
}
