package org.apache.fluo.recipes.core.combine;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/fluo/recipes/core/combine/CombineQueueImpl.class */
public class CombineQueueImpl<K, V> implements CombineQueue<K, V> {
    static final Column DATA_COLUMN = new Column("data", "current");
    static final Column UPDATE_COL = new Column("u", "v");
    static final Column NEXT_COL = new Column("u", "next");
    private Bytes updatePrefix;
    private Bytes dataPrefix;
    private Column notifyColumn;
    private final String cqId;
    private final Class<K> keyType;
    private final Class<V> valType;
    private final int numBuckets;
    private final long bufferSize;
    private SimpleSerializer serializer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CombineQueueImpl(String str, SimpleConfiguration simpleConfiguration) throws Exception {
        this.cqId = str;
        this.updatePrefix = Bytes.of(str + ":u:");
        this.dataPrefix = Bytes.of(str + ":d:");
        this.notifyColumn = new Column("fluoRecipes", "cfm:" + str);
        this.keyType = (Class<K>) getClass().getClassLoader().loadClass(CqConfigurator.getKeyType(str, simpleConfiguration));
        this.valType = (Class<V>) getClass().getClassLoader().loadClass(CqConfigurator.getValueType(str, simpleConfiguration));
        this.numBuckets = CqConfigurator.getNumBucket(str, simpleConfiguration);
        this.bufferSize = CqConfigurator.getBufferSize(str, simpleConfiguration);
        this.serializer = SimpleSerializer.getInstance(simpleConfiguration);
    }

    private static byte[] encSeq(long j) {
        return new byte[]{(byte) (j >>> 56), (byte) (j >>> 48), (byte) (j >>> 40), (byte) (j >>> 32), (byte) (j >>> 24), (byte) (j >>> 16), (byte) (j >>> 8), (byte) (j >>> 0)};
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String genBucketId(int i, int i2) {
        Preconditions.checkArgument(i >= 0);
        Preconditions.checkArgument(i2 > 0);
        int numberOfLeadingZeros = 32 - Integer.numberOfLeadingZeros(i2);
        return Strings.padStart(Integer.toHexString(i), (numberOfLeadingZeros / 4) + (numberOfLeadingZeros % 4 > 0 ? 1 : 0), '0');
    }

    @Override // org.apache.fluo.recipes.core.combine.CombineQueue
    public void addAll(TransactionBase transactionBase, Map<K, V> map) {
        Preconditions.checkState(this.numBuckets > 0, "Not initialized");
        HashSet<String> hashSet = new HashSet();
        Bytes.BytesBuilder builder = Bytes.builder();
        builder.append(this.updatePrefix);
        int length = builder.getLength();
        byte[] encSeq = encSeq(transactionBase.getStartTimestamp());
        for (Map.Entry<K, V> entry : map.entrySet()) {
            byte[] serialize = this.serializer.serialize(entry.getKey());
            String genBucketId = genBucketId(Math.abs(Hashing.murmur3_32().hashBytes(serialize).asInt() % this.numBuckets), this.numBuckets);
            builder.setLength(length);
            transactionBase.set(builder.append(genBucketId).append(58).append(serialize).append(encSeq).toBytes(), UPDATE_COL, Bytes.of(this.serializer.serialize(entry.getValue())));
            hashSet.add(genBucketId);
        }
        for (String str : hashSet) {
            builder.setLength(length);
            builder.append(str).append(58);
            transactionBase.setWeakNotification(builder.toBytes(), this.notifyColumn);
        }
    }

    private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase transactionBase, Bytes.BytesBuilder bytesBuilder, Set<Bytes> set) {
        HashSet hashSet = new HashSet();
        int length = bytesBuilder.getLength();
        for (Bytes bytes : set) {
            bytesBuilder.setLength(length);
            hashSet.add(bytesBuilder.append(bytes).toBytes());
        }
        try {
            return transactionBase.get(hashSet, Collections.singleton(DATA_COLUMN));
        } catch (IllegalArgumentException e) {
            System.out.println(hashSet.size());
            throw e;
        }
    }

    private V deserVal(Bytes bytes) {
        return (V) this.serializer.deserialize(bytes.toArray(), this.valType);
    }

    private Bytes getKeyFromUpdateRow(Bytes bytes, Bytes bytes2) {
        return bytes2.subSequence(bytes.length(), bytes2.length() - 8);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v138, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v142, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v148, types: [java.util.ArrayList] */
    /* JADX WARN: Type inference failed for: r13v0, types: [org.apache.fluo.recipes.core.combine.ChangeObserver, org.apache.fluo.recipes.core.combine.ChangeObserver<K, V>] */
    void process(TransactionBase transactionBase, Bytes bytes, Column column, Combiner<K, V> combiner, ChangeObserver<K, V> changeObserver) throws Exception {
        Span prefix;
        Preconditions.checkState(bytes.startsWith(this.updatePrefix));
        Bytes bytes2 = transactionBase.get(bytes, NEXT_COL);
        if (bytes2 != null) {
            Bytes bytes3 = Bytes.builder(bytes.length() + bytes2.length()).append(bytes).append(bytes2).toBytes();
            Span prefix2 = Span.prefix(bytes);
            prefix = new Span(new RowColumn(bytes3, UPDATE_COL), false, prefix2.getEnd(), prefix2.isEndInclusive());
        } else {
            prefix = Span.prefix(bytes);
        }
        Iterator it = transactionBase.scanner().over(prefix).fetch(new Column[]{UPDATE_COL}).build().iterator();
        HashMap hashMap = new HashMap();
        long j = 0;
        Bytes bytes4 = null;
        boolean z = false;
        if (it.hasNext()) {
            Bytes bytes5 = null;
            while (it.hasNext() && j < this.bufferSize) {
                RowColumnValue rowColumnValue = (RowColumnValue) it.next();
                Bytes row = rowColumnValue.getRow();
                transactionBase.delete(row, UPDATE_COL);
                Bytes keyFromUpdateRow = getKeyFromUpdateRow(bytes, row);
                bytes5 = keyFromUpdateRow;
                V v = (List) hashMap.get(keyFromUpdateRow);
                if (v == null) {
                    v = new ArrayList();
                    hashMap.put(keyFromUpdateRow, v);
                }
                v.add(rowColumnValue.getValue());
                j = j + row.length() + r0.length();
            }
            if (it.hasNext()) {
                if (getKeyFromUpdateRow(bytes, ((RowColumnValue) it.next()).getRow()).equals(bytes5)) {
                    bytes4 = bytes5;
                    transactionBase.set(bytes, NEXT_COL, bytes4);
                } else {
                    transactionBase.set(bytes, NEXT_COL, Bytes.builder(bytes5.length() + 1).append(bytes5).append(0).toBytes());
                }
                z = true;
            } else if (bytes2 != null) {
                transactionBase.delete(bytes, NEXT_COL);
            }
        } else if (bytes2 != null) {
            transactionBase.delete(bytes, NEXT_COL);
        }
        if (bytes2 != null || z) {
            transactionBase.setWeakNotification(bytes, column);
        }
        Bytes.BytesBuilder builder = Bytes.builder();
        builder.append(this.dataPrefix);
        builder.append(bytes.subSequence(this.updatePrefix.length(), bytes.length()));
        int length = builder.getLength();
        Set<Bytes> keySet = hashMap.keySet();
        if (bytes4 != null) {
            Bytes bytes6 = bytes4;
            keySet = Sets.filter(keySet, bytes7 -> {
                return !bytes7.equals(bytes6);
            });
        }
        Map<Bytes, Map<Column, Bytes>> currentValues = getCurrentValues(transactionBase, builder, keySet);
        ArrayList arrayList = new ArrayList(hashMap.size());
        for (Map.Entry<K, V> entry : hashMap.entrySet()) {
            builder.setLength(length);
            Bytes bytes8 = builder.append((Bytes) entry.getKey()).toBytes();
            Bytes bytes9 = currentValues.getOrDefault(bytes8, Collections.emptyMap()).get(DATA_COLUMN);
            Object deserialize = this.serializer.deserialize(((Bytes) entry.getKey()).toArray(), this.keyType);
            if (bytes4 == null || !bytes4.equals(entry.getKey())) {
                Optional<V> combine = combiner.combine(new InputImpl(deserialize, this::deserVal, bytes9, (Collection) entry.getValue()));
                Bytes of = combine.isPresent() ? Bytes.of(this.serializer.serialize(combine.get())) : null;
                if (((of != null) ^ (bytes9 != null)) || (bytes9 != null && !bytes9.equals(of))) {
                    if (of == null) {
                        transactionBase.delete(bytes8, DATA_COLUMN);
                    } else {
                        transactionBase.set(bytes8, DATA_COLUMN, of);
                    }
                    arrayList.add(new ChangeImpl(deserialize, Optional.ofNullable(bytes9).map(this::deserVal), combine));
                }
            } else {
                Optional<V> combine2 = combiner.combine(new InputImpl(deserialize, this::deserVal, (Collection) entry.getValue()));
                if (combine2.isPresent()) {
                    addAll(transactionBase, Collections.singletonMap(deserialize, combine2.get()));
                }
            }
        }
        hashMap.clear();
        currentValues.clear();
        if (arrayList.size() > 0) {
            changeObserver.process(transactionBase, arrayList);
        }
    }

    @Override // org.apache.fluo.recipes.core.combine.CombineQueue
    public void registerObserver(ObserverProvider.Registry registry, Combiner<K, V> combiner, ChangeObserver<K, V> changeObserver) {
        registry.forColumn(this.notifyColumn, Observer.NotificationType.WEAK).withId("combineq-" + this.cqId).useObserver((transactionBase, bytes, column) -> {
            process(transactionBase, bytes, column, combiner, changeObserver);
        });
    }
}
