package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.class */
public class TieredResultPartitionFactory {
    private final TieredStorageConfiguration tieredStorageConfiguration;
    private final TieredStorageNettyServiceImpl tieredStorageNettyService;
    private final TieredStorageResourceRegistry tieredStorageResourceRegistry;

    public TieredResultPartitionFactory(TieredStorageConfiguration tieredStorageConfiguration, TieredStorageNettyServiceImpl tieredStorageNettyServiceImpl, TieredStorageResourceRegistry tieredStorageResourceRegistry) {
        this.tieredStorageConfiguration = tieredStorageConfiguration;
        this.tieredStorageNettyService = tieredStorageNettyServiceImpl;
        this.tieredStorageResourceRegistry = tieredStorageResourceRegistry;
    }

    public TieredStorageConfiguration getTieredStorageConfiguration() {
        return this.tieredStorageConfiguration;
    }

    public TieredStorageNettyServiceImpl getTieredStorageNettyService() {
        return this.tieredStorageNettyService;
    }

    public TieredStorageResourceRegistry getTieredStorageResourceRegistry() {
        return this.tieredStorageResourceRegistry;
    }

    public TieredResultPartition createTieredResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i2, int i3, Boolean bool, ResultPartitionManager resultPartitionManager, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> supplierWithException, FileChannelManager fileChannelManager, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService) {
        TieredStorageMemoryManagerImpl tieredStorageMemoryManagerImpl = new TieredStorageMemoryManagerImpl(this.tieredStorageConfiguration.getNumBuffersTriggerFlushRatio(), true);
        BufferAccumulator createBufferAccumulator = createBufferAccumulator(i2, this.tieredStorageConfiguration.getAccumulatorExclusiveBuffers(), tieredStorageMemoryManagerImpl);
        Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>> createTierProducerAgentsAndMemorySpecs = createTierProducerAgentsAndMemorySpecs(i2, bool.booleanValue(), TieredStorageIdMappingUtils.convertId(resultPartitionID), tieredStorageMemoryManagerImpl, createBufferAccumulator, resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE, fileChannelManager, batchShuffleReadBufferPool, scheduledExecutorService);
        return new TieredResultPartition(str, i, resultPartitionID, resultPartitionType, i2, i3, resultPartitionManager, bufferCompressor, supplierWithException, new TieredStorageProducerClient(i2, bool.booleanValue(), createBufferAccumulator, bufferCompressor, createTierProducerAgentsAndMemorySpecs.f0), this.tieredStorageResourceRegistry, this.tieredStorageNettyService, createTierProducerAgentsAndMemorySpecs.f1, tieredStorageMemoryManagerImpl);
    }

    private BufferAccumulator createBufferAccumulator(int i, int i2, TieredStorageMemoryManager tieredStorageMemoryManager) {
        int tieredStorageBufferSize = this.tieredStorageConfiguration.getTieredStorageBufferSize();
        return i + 1 > i2 ? new SortBufferAccumulator(i, i2, tieredStorageBufferSize, tieredStorageMemoryManager) : new HashBufferAccumulator(i, tieredStorageBufferSize, tieredStorageMemoryManager);
    }

    private Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>> createTierProducerAgentsAndMemorySpecs(int i, boolean z, TieredStoragePartitionId tieredStoragePartitionId, TieredStorageMemoryManager tieredStorageMemoryManager, BufferAccumulator bufferAccumulator, boolean z2, FileChannelManager fileChannelManager, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new TieredStorageMemorySpec(bufferAccumulator, 2 * Math.min(i + 1, this.tieredStorageConfiguration.getAccumulatorExclusiveBuffers())));
        List<Integer> eachTierExclusiveBufferNum = this.tieredStorageConfiguration.getEachTierExclusiveBufferNum();
        List<TierFactory> tierFactories = this.tieredStorageConfiguration.getTierFactories();
        for (int i2 = 0; i2 < tierFactories.size(); i2++) {
            TierFactory tierFactory = tierFactories.get(i2);
            if (z2 || tierFactory.getClass() != MemoryTierFactory.class) {
                TierProducerAgent createProducerAgent = tierFactory.createProducerAgent(i, tieredStoragePartitionId, fileChannelManager.createChannel().getPath(), z, tieredStorageMemoryManager, this.tieredStorageNettyService, this.tieredStorageResourceRegistry, batchShuffleReadBufferPool, scheduledExecutorService, Math.max(2 * batchShuffleReadBufferPool.getNumBuffersPerRequest(), i), this.tieredStorageConfiguration.getDiskIOSchedulerBufferRequestTimeout(), this.tieredStorageConfiguration.getDiskIOSchedulerMaxBuffersReadAhead());
                arrayList.add(createProducerAgent);
                arrayList2.add(new TieredStorageMemorySpec(createProducerAgent, eachTierExclusiveBufferNum.get(i2).intValue()));
            }
        }
        return Tuple2.of(arrayList, arrayList2);
    }
}
