package org.apache.fluo.integration.impl;

import com.google.common.collect.Iterables;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestUtil;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/fluo/integration/impl/CollisionIT.class */
public class CollisionIT extends ITBaseMini {
    private static final Column STAT_TOTAL = new Column("stat", "total");
    private static final Column STAT_CHANGED = new Column("stat", "changed");
    private static final Column STAT_PROCESSED = new Column("stat", "processed");

    @Rule
    public Timeout globalTimeout = Timeout.seconds(getTestTimeout());

    /* loaded from: input_file:org/apache/fluo/integration/impl/CollisionIT$CollisionObserverProvider.class */
    public static class CollisionObserverProvider implements ObserverProvider {
        public void provide(ObserverProvider.Registry registry, ObserverProvider.Context context) {
            registry.forColumn(CollisionIT.STAT_CHANGED, Observer.NotificationType.WEAK).useStrObserver((transactionBase, str, column) -> {
                int parseInt = Integer.parseInt(transactionBase.gets(str, CollisionIT.STAT_TOTAL));
                int orDefault = TestUtil.getOrDefault(transactionBase, str, CollisionIT.STAT_PROCESSED, 0);
                transactionBase.set(str, CollisionIT.STAT_PROCESSED, parseInt + "");
                TestUtil.increment(transactionBase, "all", CollisionIT.STAT_TOTAL, parseInt - orDefault);
            });
        }
    }

    /* loaded from: input_file:org/apache/fluo/integration/impl/CollisionIT$NumLoader.class */
    private static class NumLoader implements Loader {
        int num;

        NumLoader(int i) {
            this.num = i;
        }

        public void load(TransactionBase transactionBase, Loader.Context context) throws Exception {
            TestUtil.increment(transactionBase, this.num + "", CollisionIT.STAT_TOTAL, 1);
            transactionBase.setWeakNotification(this.num + "", CollisionIT.STAT_CHANGED);
        }
    }

    @Override // org.apache.fluo.integration.ITBase
    protected Class<? extends ObserverProvider> getObserverProviderClass() {
        return CollisionObserverProvider.class;
    }

    @Override // org.apache.fluo.integration.ITBaseMini
    protected void setConfig(FluoConfiguration fluoConfiguration) {
        fluoConfiguration.setLoaderQueueSize(20);
        fluoConfiguration.setLoaderThreads(20);
        fluoConfiguration.setWorkerThreads(20);
        fluoConfiguration.setProperty("fluo.impl.timestamp.update.period", "100");
    }

    @Test
    public void testLotsOfCollisions() throws Exception {
        long j;
        Random random = new Random(45734985L);
        int[] iArr = new int[1000];
        int[] iArr2 = new int[5];
        for (int i = 0; i < iArr.length; i++) {
            iArr[i] = random.nextInt(iArr2.length);
            int i2 = iArr[i];
            iArr2[i2] = iArr2[i2] + 1;
        }
        LoaderExecutor newLoaderExecutor = client.newLoaderExecutor();
        try {
            for (int i3 : iArr) {
                newLoaderExecutor.execute(new NumLoader(i3));
            }
            if (newLoaderExecutor != null) {
                newLoaderExecutor.close();
            }
            miniFluo.waitForObservers();
            Snapshot newSnapshot = client.newSnapshot();
            for (int i4 = 0; i4 < iArr2.length; i4++) {
                try {
                    Assert.assertNotNull(newSnapshot.gets(i4 + "", STAT_TOTAL));
                    Assert.assertEquals(iArr2[i4], Integer.parseInt(r0));
                    Assert.assertNotNull(newSnapshot.gets(i4 + "", STAT_PROCESSED));
                    Assert.assertEquals(iArr2[i4], Integer.parseInt(r0));
                } catch (Throwable th) {
                    if (newSnapshot != null) {
                        try {
                            newSnapshot.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            Assert.assertNotNull(newSnapshot.gets("all", STAT_TOTAL));
            Assert.assertEquals(1000L, Integer.parseInt(r0));
            long startTimestamp = newSnapshot.getStartTimestamp();
            if (newSnapshot != null) {
                newSnapshot.close();
            }
            long gcTimestamp = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
            while (true) {
                j = gcTimestamp;
                if (j >= startTimestamp) {
                    break;
                }
                UtilWaitThread.sleep(300L);
                gcTimestamp = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
            }
            aClient.tableOperations().compact(getCurTableName(), (Text) null, (Text) null, true, true);
            Scanner createScanner = aClient.createScanner(getCurTableName(), Authorizations.EMPTY);
            HashSet hashSet = new HashSet();
            Iterator it = createScanner.iterator();
            while (it.hasNext()) {
                Key key = (Key) ((Map.Entry) it.next()).getKey();
                String str = key.getRow() + ":" + key.getColumnFamily() + ":" + key.getColumnQualifier() + ":" + String.format("%x", Long.valueOf(key.getTimestamp() & (-2305843009213693952L)));
                if (hashSet.contains(str)) {
                    System.err.println("DEBUG oldestTs : " + j + " recentTS : " + startTimestamp);
                    Iterable transform = Iterables.transform(createScanner, entry -> {
                        return "DEBUG " + FluoFormatter.toString(entry);
                    });
                    PrintStream printStream = System.err;
                    Objects.requireNonNull(printStream);
                    transform.forEach(printStream::println);
                }
                Assert.assertFalse("Duplicate row col " + str, hashSet.contains(str));
                hashSet.add(str);
            }
        } catch (Throwable th3) {
            if (newLoaderExecutor != null) {
                try {
                    newLoaderExecutor.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
