package com.linkedin.parseq.batching;

import com.linkedin.parseq.Context;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.batching.BatchImpl;
import com.linkedin.parseq.internal.ContextImpl;
import com.linkedin.parseq.internal.PlanContext;
import com.linkedin.parseq.promise.CountDownPromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import com.linkedin.parseq.trace.Relationship;
import com.linkedin.parseq.trace.ShallowTraceBuilder;
import com.linkedin.parseq.trace.TraceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/parseq/batching/BatchingStrategy.class */
public abstract class BatchingStrategy<G, K, T> {
    public static final int DEFAULT_MAX_BATCH_SIZE = 1024;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BatchingStrategy.class);
    private static final int DEFAULT_KEY_SIZE = 1;
    private final ConcurrentMap<Long, BatchingStrategy<G, K, T>.GroupBatchBuilder> _batches = new ConcurrentHashMap();
    private final BatchSizeMetric _batchSizeMetric = new BatchSizeMetric();
    private final BatchAggregationTimeMetric _batchAggregationTimeMetric = new BatchAggregationTimeMetric();

    /* loaded from: input_file:com/linkedin/parseq/batching/BatchingStrategy$GroupBatchBuilder.class */
    private class GroupBatchBuilder {
        private final Map<G, BatchImpl.BatchBuilder<K, T>> _batchesByGroup;

        private GroupBatchBuilder() {
            this._batchesByGroup = new HashMap();
        }

        Batch<K, T> add(G g, K k, ShallowTraceBuilder shallowTraceBuilder, BatchImpl.BatchPromise<T> batchPromise) {
            int keySize = BatchingStrategy.this.keySize(g, k);
            BatchImpl.BatchBuilder<K, T> computeIfAbsent = this._batchesByGroup.computeIfAbsent(g, obj -> {
                return new BatchImpl.BatchBuilder(BatchingStrategy.this.maxBatchSizeForGroup(g), BatchingStrategy.this._batchAggregationTimeMetric);
            });
            if (computeIfAbsent.add(k, shallowTraceBuilder, batchPromise, keySize)) {
                if (!computeIfAbsent.isFull()) {
                    return null;
                }
                this._batchesByGroup.remove(g);
                return computeIfAbsent.build();
            }
            BatchImpl.BatchBuilder<K, T> batchBuilder = new BatchImpl.BatchBuilder<>(BatchingStrategy.this.maxBatchSizeForGroup(g), BatchingStrategy.this._batchAggregationTimeMetric);
            batchBuilder.add(k, shallowTraceBuilder, batchPromise, keySize);
            if (!batchBuilder.isFull() && computeIfAbsent.batchSize() > batchBuilder.batchSize()) {
                this._batchesByGroup.put(g, batchBuilder);
                return computeIfAbsent.build();
            }
            return batchBuilder.build();
        }

        Map<G, BatchImpl.BatchBuilder<K, T>> batches() {
            return this._batchesByGroup;
        }
    }

    public Task<T> batchable(String str, K k) {
        Task<T> async = Task.async(str, context -> {
            BatchImpl.BatchPromise<T> batchPromise = new BatchImpl.BatchPromise<>();
            BatchingStrategy<G, K, T>.GroupBatchBuilder computeIfAbsent = this._batches.computeIfAbsent(context.getPlanId(), l -> {
                return new GroupBatchBuilder();
            });
            G classify = classify(k);
            Batch<K, T> add = computeIfAbsent.add(classify, k, context.getShallowTraceBuilder(), batchPromise);
            if (add != null) {
                try {
                    context.run(taskForBatch(classify, add, true));
                } catch (Throwable th) {
                    add.failAll(th);
                }
            }
            return batchPromise;
        });
        async.getShallowTraceBuilder().setTaskType("batched");
        return async;
    }

    public Task<T> batchable(K k) {
        return batchable("batchableTaskForKey: " + k.toString(), k);
    }

    private Task<?> taskForBatch(G g, Batch<K, T> batch, boolean z) {
        this._batchSizeMetric.record(batch.batchSize());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(debugInfo(g, batch));
        }
        return Task.async(getBatchName(g, batch), context -> {
            SettablePromise settablePromise = Promises.settable();
            CountDownPromiseListener countDownPromiseListener = new CountDownPromiseListener(batch.keySize(), settablePromise, null);
            boolean z2 = false;
            TraceBuilder traceBuilder = context.getTraceBuilder();
            for (BatchImpl.BatchEntry<T> batchEntry : batch.values()) {
                for (ShallowTraceBuilder shallowTraceBuilder : batchEntry.getShallowTraceBuilders()) {
                    if (z2 || z) {
                        traceBuilder.addRelationship(Relationship.POTENTIAL_CHILD_OF, context.getShallowTraceBuilder(), shallowTraceBuilder);
                    } else {
                        traceBuilder.addRelationship(Relationship.CHILD_OF, context.getShallowTraceBuilder(), shallowTraceBuilder);
                        z2 = true;
                    }
                }
                BatchImpl.BatchPromise<T> promise = batchEntry.getPromise();
                promise.getInternal().addListener(countDownPromiseListener);
                settablePromise.addListener(promise2 -> {
                    promise.trigger();
                });
            }
            try {
                executeBatchWithContext(g, batch, context);
            } catch (Throwable th) {
                batch.failAll(th);
            }
            context.getShallowTraceBuilder().setSystemHidden(true);
            return settablePromise;
        });
    }

    private void runBatch(PlanContext planContext, G g, Batch<K, T> batch) {
        try {
            Task<?> taskForBatch = taskForBatch(g, batch, false);
            new ContextImpl(planContext.fork(taskForBatch), taskForBatch).runTask();
        } catch (Throwable th) {
            batch.failAll(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleBatch(PlanContext planContext) {
        BatchingStrategy<G, K, T>.GroupBatchBuilder remove = this._batches.remove(planContext.getId());
        if (remove != null) {
            remove.batches().forEach((obj, batchBuilder) -> {
                runBatch(planContext, obj, batchBuilder.build());
            });
        }
    }

    private String debugInfo(G g, Batch<K, T> batch) {
        StringBuilder sb = new StringBuilder("\n");
        sb.append("group: ").append(g).append("\n").append("batch keys: \n");
        batch.keys().forEach(obj -> {
            sb.append("    ").append(obj).append("\n");
        });
        return sb.toString();
    }

    public BatchSizeMetric getBatchSizeMetric() {
        return this._batchSizeMetric;
    }

    public BatchAggregationTimeMetric getBatchAggregationTimeMetric() {
        return this._batchAggregationTimeMetric;
    }

    public abstract void executeBatch(G g, Batch<K, T> batch);

    protected void executeBatchWithContext(G g, Batch<K, T> batch, Context context) {
        executeBatch(g, batch);
    }

    public abstract G classify(K k);

    public int maxBatchSizeForGroup(G g) {
        return 1024;
    }

    public int keySize(G g, K k) {
        return 1;
    }

    public String getBatchName(G g, Batch<K, T> batch) {
        return "batch(keys: " + batch.keySize() + ", size: " + batch.batchSize() + ")";
    }
}
