package org.apache.spark.shuffle.reader;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Aggregator;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.TempShuffleReadMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.util.CompletionIterator$;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.collection.ExternalSorter;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Product2;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/spark/shuffle/reader/RssShuffleReader.class */
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class);
    private final boolean expectedTaskIdsBitmapFilterEnable;
    private String appId;
    private int shuffleId;
    private int startPartition;
    private int endPartition;
    private TaskContext context;
    private ShuffleDependency<K, C, ?> shuffleDependency;
    private Serializer serializer;
    private String taskId;
    private String basePath;
    private int partitionNumPerRange;
    private int partitionNum;
    private Roaring64NavigableMap blockIdBitmap;
    private Roaring64NavigableMap taskIdBitmap;
    private List<ShuffleServerInfo> shuffleServerInfoList;
    private Configuration hadoopConf;
    private RssConf rssConf;

    /* loaded from: input_file:org/apache/spark/shuffle/reader/RssShuffleReader$ReadMetrics.class */
    static class ReadMetrics extends ShuffleReadMetrics {
        private TempShuffleReadMetrics tempShuffleReadMetrics;

        ReadMetrics(TempShuffleReadMetrics tempShuffleReadMetrics) {
            this.tempShuffleReadMetrics = tempShuffleReadMetrics;
        }

        public void incRemoteBytesRead(long j) {
            this.tempShuffleReadMetrics.incRemoteBytesRead(j);
        }

        public void incFetchWaitTime(long j) {
            this.tempShuffleReadMetrics.incFetchWaitTime(j);
        }

        public void incRecordsRead(long j) {
            this.tempShuffleReadMetrics.incRecordsRead(j);
        }
    }

    public RssShuffleReader(int i, int i2, TaskContext taskContext, RssShuffleHandle<K, C, ?> rssShuffleHandle, String str, Configuration configuration, int i3, int i4, Roaring64NavigableMap roaring64NavigableMap, Roaring64NavigableMap roaring64NavigableMap2, RssConf rssConf) {
        this.appId = rssShuffleHandle.getAppId();
        this.startPartition = i;
        this.endPartition = i2;
        this.context = taskContext;
        this.shuffleDependency = rssShuffleHandle.getDependency();
        this.shuffleId = this.shuffleDependency.shuffleId();
        this.serializer = rssShuffleHandle.getDependency().serializer();
        this.taskId = "" + taskContext.taskAttemptId() + "_" + taskContext.attemptNumber();
        this.basePath = str;
        this.partitionNumPerRange = i3;
        this.partitionNum = i4;
        this.blockIdBitmap = roaring64NavigableMap;
        this.taskIdBitmap = roaring64NavigableMap2;
        this.hadoopConf = configuration;
        this.shuffleServerInfoList = rssShuffleHandle.getPartitionToServers().get(Integer.valueOf(i));
        this.rssConf = rssConf;
        this.expectedTaskIdsBitmapFilterEnable = this.shuffleServerInfoList.size() > 1;
    }

    public Iterator<Product2<K, C>> read() {
        Iterator iterator;
        LOG.info("Shuffle read started:" + getReadInfo());
        final RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator(this.shuffleDependency.serializer(), ShuffleClientFactory.getInstance().createShuffleReadClient(new CreateShuffleReadClientRequest(this.appId, this.shuffleId, this.startPartition, this.basePath, this.partitionNumPerRange, this.partitionNum, this.blockIdBitmap, this.taskIdBitmap, this.shuffleServerInfoList, this.hadoopConf, this.expectedTaskIdsBitmapFilterEnable, this.rssConf)), new ReadMetrics(this.context.taskMetrics().createTempShuffleReadMetrics()), this.rssConf);
        Iterator apply = CompletionIterator$.MODULE$.apply(rssShuffleDataIterator, new AbstractFunction0<BoxedUnit>() { // from class: org.apache.spark.shuffle.reader.RssShuffleReader.1
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m1085apply() {
                RssShuffleReader.this.context.taskMetrics().mergeShuffleReadMetrics();
                return rssShuffleDataIterator.cleanup();
            }
        });
        this.context.addTaskCompletionListener(taskContext -> {
            apply.completion();
        });
        Iterator combineCombinersByKey = this.shuffleDependency.aggregator().isDefined() ? this.shuffleDependency.mapSideCombine() ? ((Aggregator) this.shuffleDependency.aggregator().get()).combineCombinersByKey(apply, this.context) : ((Aggregator) this.shuffleDependency.aggregator().get()).combineValuesByKey(apply, this.context) : apply;
        if (this.shuffleDependency.keyOrdering().isDefined()) {
            final ExternalSorter externalSorter = new ExternalSorter(this.context, Option.empty(), Option.empty(), this.shuffleDependency.keyOrdering(), this.serializer);
            LOG.info("Inserting aggregated records to sorter");
            long currentTimeMillis = System.currentTimeMillis();
            externalSorter.insertAll(combineCombinersByKey);
            LOG.info("Inserted aggregated records to sorter: millis:" + (System.currentTimeMillis() - currentTimeMillis));
            this.context.taskMetrics().incMemoryBytesSpilled(externalSorter.memoryBytesSpilled());
            this.context.taskMetrics().incDiskBytesSpilled(externalSorter.diskBytesSpilled());
            this.context.taskMetrics().incPeakExecutionMemory(externalSorter.peakMemoryUsedBytes());
            this.context.addTaskCompletionListener(new TaskCompletionListener() { // from class: org.apache.spark.shuffle.reader.RssShuffleReader.2
                public void onTaskCompletion(TaskContext taskContext2) {
                    externalSorter.stop();
                }
            });
            iterator = CompletionIterator$.MODULE$.apply(externalSorter.iterator(), new AbstractFunction0<BoxedUnit>() { // from class: org.apache.spark.shuffle.reader.RssShuffleReader.3
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public BoxedUnit m1086apply() {
                    externalSorter.stop();
                    return BoxedUnit.UNIT;
                }
            });
        } else {
            iterator = combineCombinersByKey;
        }
        if (!(iterator instanceof InterruptibleIterator)) {
            iterator = new InterruptibleIterator(this.context, iterator);
        }
        if (this.rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && this.rssConf.getInteger(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT, 0) > 0) {
            iterator = RssFetchFailedIterator.newBuilder().appId(this.appId).shuffleId(this.shuffleId).partitionId(this.startPartition).stageAttemptId(this.context.stageAttemptNumber()).reportServerHost(this.rssConf.getString(Constants.DRIVER_HOST, "")).port(((Integer) this.rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT)).intValue()).build(iterator);
        }
        return iterator;
    }

    private String getReadInfo() {
        return "appId=" + this.appId + ", shuffleId=" + this.shuffleId + ",taskId=" + this.taskId + ", partitions: [" + this.startPartition + ", " + this.endPartition + ")";
    }

    public Configuration getHadoopConf() {
        return this.hadoopConf;
    }
}
