package io.dingodb.exec.operator;

import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.config.DingoConfiguration;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.Optional;
import io.dingodb.common.util.RangeUtils;
import io.dingodb.common.util.Utils;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.operator.data.Context;
import io.dingodb.exec.operator.params.DistributionSourceParam;
import io.dingodb.meta.MetaService;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.transaction.exception.LockWaitException;
import io.dingodb.store.api.transaction.exception.RegionSplitException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/exec/operator/NewCalcDistributionOperator.class */
public class NewCalcDistributionOperator extends SourceOperator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NewCalcDistributionOperator.class);
    public static final NewCalcDistributionOperator INSTANCE = new NewCalcDistributionOperator();

    private NewCalcDistributionOperator() {
    }

    private static NavigableSet<RangeDistribution> getRangeDistributions(DistributionSourceParam distributionSourceParam) {
        PartitionService ps = distributionSourceParam.getPs();
        NavigableMap<ByteArrayUtils.ComparableByteArray, RangeDistribution> rangeDistribution = distributionSourceParam.getRangeDistribution();
        LogUtils.trace(log, "start = {}, end = {}, PartitionService = {}, RangeDistribution = {}", Arrays.toString(distributionSourceParam.getStartKey()), Arrays.toString(distributionSourceParam.getEndKey()), ps.getClass().getCanonicalName(), rangeDistribution.entrySet().stream().map(entry -> {
            return ((ByteArrayUtils.ComparableByteArray) entry.getKey()).encodeToString() + ": " + entry.getValue();
        }).collect(Collectors.joining(StringUtils.LF)));
        if ((distributionSourceParam.getFilter() == null && !distributionSourceParam.isNotBetween()) || (!distributionSourceParam.isLogicalNot() && !distributionSourceParam.isNotBetween())) {
            return ps.calcPartitionRange(distributionSourceParam.getStartKey(), distributionSourceParam.getEndKey(), distributionSourceParam.isWithStart(), distributionSourceParam.isWithEnd(), rangeDistribution);
        }
        TreeSet treeSet = new TreeSet(RangeUtils.rangeComparator(1));
        treeSet.addAll(ps.calcPartitionRange(null, distributionSourceParam.getStartKey(), true, !distributionSourceParam.isWithStart(), distributionSourceParam.getRangeDistribution()));
        treeSet.addAll(ps.calcPartitionRange(distributionSourceParam.getEndKey(), null, !distributionSourceParam.isWithEnd(), true, distributionSourceParam.getRangeDistribution()));
        return treeSet;
    }

    @Override // io.dingodb.exec.operator.SourceOperator
    public boolean push(Context context, Vertex vertex) {
        DistributionSourceParam distributionSourceParam = (DistributionSourceParam) vertex.getParam();
        NavigableSet<RangeDistribution> rangeDistributions = getRangeDistributions(distributionSourceParam);
        if (log.isTraceEnabled() && rangeDistributions.isEmpty()) {
            LogUtils.trace(log, "No data distribution from ({}) to ({})", Arrays.toString(distributionSourceParam.getStartKey()), Arrays.toString(distributionSourceParam.getEndKey()));
        }
        if (!Utils.parallel(distributionSourceParam.getKeepOrder()) || rangeDistributions.size() == 1) {
            for (RangeDistribution rangeDistribution : rangeDistributions) {
                if (log.isTraceEnabled()) {
                    LogUtils.trace(log, "Push distribution: {}", rangeDistribution);
                }
                context.setDistribution(rangeDistribution);
                if (!vertex.getSoleEdge().transformToNext(context, null)) {
                    return false;
                }
            }
            return false;
        }
        try {
            int concurrencyLevel = distributionSourceParam.getConcurrencyLevel();
            HashSet hashSet = new HashSet(concurrencyLevel);
            Iterator<RangeDistribution> it2 = rangeDistributions.iterator();
            while (it2.hasNext()) {
                hashSet.add(push(context, vertex, distributionSourceParam, it2.next()));
                if (hashSet.size() >= concurrencyLevel) {
                    CompletableFuture.allOf((CompletableFuture[]) hashSet.toArray(new CompletableFuture[0])).join();
                    hashSet.clear();
                }
            }
            if (!hashSet.isEmpty()) {
                CompletableFuture.allOf((CompletableFuture[]) hashSet.toArray(new CompletableFuture[0])).join();
            }
            return false;
        } catch (CompletionException e) {
            if (e.getCause() instanceof LockWaitException) {
                throw new LockWaitException("Lock wait");
            }
            throw e;
        }
    }

    private static CompletableFuture<Boolean> push(Context context, Vertex vertex, DistributionSourceParam distributionSourceParam, RangeDistribution rangeDistribution) {
        Supplier supplier = () -> {
            if (log.isTraceEnabled()) {
                LogUtils.trace(log, "Push distribution: {}", rangeDistribution);
            }
            Context copy = context.copy();
            copy.setDistribution(rangeDistribution);
            return Boolean.valueOf(vertex.getSoleEdge().transformToNext(copy, null));
        };
        Integer num = (Integer) Optional.mapOrGet(DingoConfiguration.instance().find("retry", Integer.TYPE), num2 -> {
            return num2;
        }, () -> {
            return 30;
        });
        return CompletableFuture.supplyAsync(supplier, Executors.executor("operator-" + vertex.getTask().getJobId() + "-" + vertex.getTask().getId() + "-" + vertex.getId() + "-" + rangeDistribution.getId())).exceptionally(th -> {
            if (th != null) {
                if (!(th.getCause() instanceof RegionSplitException)) {
                    if (!(th.getCause() instanceof LockWaitException)) {
                        throw new RuntimeException(th);
                    }
                    LogUtils.error(log, "jobId:" + vertex.getTask().getJobId() + ", taskId:" + vertex.getTask().getId() + ", vertexId:" + vertex.getId() + ", error:", th);
                    throw new LockWaitException("Lock wait");
                }
                int intValue = distributionSourceParam.getSplitRetry().containsKey(rangeDistribution.getId()) ? distributionSourceParam.getSplitRetry().get(rangeDistribution.getId()).intValue() + 1 : 1;
                if (intValue > 10) {
                    MetaService.root().invalidateDistribution(distributionSourceParam.getTd().getTableId());
                }
                if (intValue > num.intValue()) {
                    LogUtils.error(log, th.getMessage(), th);
                    throw new RuntimeException("The number of split retries exceeds the maximum limit");
                }
                distributionSourceParam.getSplitRetry().put(rangeDistribution.getId(), Integer.valueOf(intValue));
                NavigableSet<RangeDistribution> rangeDistributions = getRangeDistributions(distributionSourceParam.copy(MetaService.root().getRangeDistribution(distributionSourceParam.getTd().getTableId()), rangeDistribution.getStartKey(), rangeDistribution.getEndKey(), rangeDistribution.isWithStart(), rangeDistribution.isWithEnd()));
                HashSet hashSet = new HashSet(distributionSourceParam.getConcurrencyLevel());
                Iterator<RangeDistribution> it2 = rangeDistributions.iterator();
                while (it2.hasNext()) {
                    hashSet.add(push(context, vertex, distributionSourceParam, it2.next()));
                }
                if (!hashSet.isEmpty()) {
                    CompletableFuture.allOf((CompletableFuture[]) hashSet.toArray(new CompletableFuture[0])).join();
                }
            }
            return true;
        });
    }
}
