package org.apache.fluo.integration.impl;

import java.util.Iterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/fluo/integration/impl/WeakNotificationOverlapIT.class */
public class WeakNotificationOverlapIT extends ITBaseImpl {
    private static final Column STAT_TOTAL = new Column("stat", "total");
    private static final Column STAT_PROCESSED = new Column("stat", "processed");
    private static final Column STAT_CHANGED = new Column("stat", "changed");
    private static final StringObserver TOTAL_OBSERVER = (transactionBase, str, column) -> {
        String sVar = transactionBase.gets(str, STAT_TOTAL);
        if (sVar == null) {
            return;
        }
        Integer valueOf = Integer.valueOf(Integer.parseInt(sVar));
        int orDefault = TestUtil.getOrDefault(transactionBase, str, STAT_PROCESSED, 0);
        transactionBase.set(str, new Column("stat", "processed"), valueOf + "");
        TestUtil.increment(transactionBase, "all", new Column("stat", "total"), valueOf.intValue() - orDefault);
    };

    /* loaded from: input_file:org/apache/fluo/integration/impl/WeakNotificationOverlapIT$WeakNtfyObserverProvider.class */
    public static class WeakNtfyObserverProvider implements ObserverProvider {
        public void provide(ObserverProvider.Registry registry, ObserverProvider.Context context) {
            registry.forColumn(WeakNotificationOverlapIT.STAT_CHANGED, Observer.NotificationType.WEAK).useStrObserver(WeakNotificationOverlapIT.TOTAL_OBSERVER);
        }
    }

    @Override // org.apache.fluo.integration.ITBase
    protected Class<? extends ObserverProvider> getObserverProviderClass() {
        return WeakNtfyObserverProvider.class;
    }

    @Test
    public void testOverlap() throws Exception {
        TestTransaction testTransaction = new TestTransaction(this.env);
        TestUtil.increment(testTransaction, "1", STAT_TOTAL, 1);
        testTransaction.setWeakNotification("1", STAT_CHANGED);
        testTransaction.done();
        TestTransaction testTransaction2 = new TestTransaction(this.env, "1", STAT_CHANGED);
        TestTransaction testTransaction3 = new TestTransaction(this.env);
        TestUtil.increment(testTransaction3, "1", STAT_TOTAL, 1);
        testTransaction3.setWeakNotification("1", STAT_CHANGED);
        testTransaction3.done();
        Assert.assertEquals(1L, countNotifications());
        TOTAL_OBSERVER.process(testTransaction2, Bytes.of("1"), STAT_CHANGED);
        testTransaction2.done();
        TestTransaction testTransaction4 = new TestTransaction(this.env);
        Assert.assertEquals("1", testTransaction4.gets("all", STAT_TOTAL));
        testTransaction4.done();
        Assert.assertEquals(1L, countNotifications());
        TestTransaction testTransaction5 = new TestTransaction(this.env, "1", STAT_CHANGED);
        TOTAL_OBSERVER.process(testTransaction5, Bytes.of("1"), STAT_CHANGED);
        testTransaction5.done();
        Assert.assertEquals(0L, countNotifications());
        TestTransaction testTransaction6 = new TestTransaction(this.env);
        Assert.assertEquals("2", testTransaction6.gets("all", STAT_TOTAL));
        testTransaction6.done();
        TestTransaction testTransaction7 = new TestTransaction(this.env);
        testTransaction7.delete("1", STAT_TOTAL);
        testTransaction7.delete("1", STAT_PROCESSED);
        testTransaction7.setWeakNotification("1", STAT_CHANGED);
        testTransaction7.done();
        Assert.assertEquals(1L, countNotifications());
        TestTransaction testTransaction8 = new TestTransaction(this.env, "1", STAT_CHANGED);
        TestTransaction testTransaction9 = new TestTransaction(this.env);
        TestUtil.increment(testTransaction9, "1", STAT_TOTAL, 1);
        testTransaction9.setWeakNotification("1", STAT_CHANGED);
        testTransaction9.done();
        Assert.assertEquals(1L, countNotifications());
        TOTAL_OBSERVER.process(testTransaction8, Bytes.of("1"), STAT_CHANGED);
        testTransaction8.done();
        Assert.assertEquals(1L, countNotifications());
        TestTransaction testTransaction10 = new TestTransaction(this.env);
        Assert.assertEquals("2", testTransaction10.gets("all", STAT_TOTAL));
        testTransaction10.done();
        TestTransaction testTransaction11 = new TestTransaction(this.env, "1", STAT_CHANGED);
        TOTAL_OBSERVER.process(testTransaction11, Bytes.of("1"), STAT_CHANGED);
        testTransaction11.done();
        Assert.assertEquals(0L, countNotifications());
        TestTransaction testTransaction12 = new TestTransaction(this.env);
        Assert.assertEquals("3", testTransaction12.gets("all", STAT_TOTAL));
        testTransaction12.done();
    }

    @Test
    public void testOverlap2() throws Exception {
        Throwable th;
        TestTransaction testTransaction = new TestTransaction(this.env);
        TestUtil.increment(testTransaction, "1", STAT_TOTAL, 1);
        testTransaction.setWeakNotification("1", STAT_CHANGED);
        testTransaction.done();
        Assert.assertEquals(1L, countNotifications());
        TestTransaction testTransaction2 = new TestTransaction(this.env);
        TestUtil.increment(testTransaction2, "1", STAT_TOTAL, 1);
        testTransaction2.setWeakNotification("1", STAT_CHANGED);
        TransactionImpl.CommitData createCommitData = testTransaction2.createCommitData();
        Assert.assertTrue(testTransaction2.preCommit(createCommitData));
        TestTransaction testTransaction3 = new TestTransaction(this.env, "1", STAT_CHANGED);
        Stamp stamp = this.env.getSharedResources().getOracleClient().getStamp();
        Assert.assertTrue(testTransaction2.commitPrimaryColumn(createCommitData, stamp));
        testTransaction2.finishCommit(createCommitData, stamp);
        testTransaction2.close();
        Assert.assertEquals(1L, countNotifications());
        TOTAL_OBSERVER.process(testTransaction3, Bytes.of("1"), STAT_CHANGED);
        testTransaction3.done();
        Assert.assertEquals(1L, countNotifications());
        Snapshot newSnapshot = client.newSnapshot();
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals("1", newSnapshot.gets("all", STAT_TOTAL));
                if (newSnapshot != null) {
                    if (0 != 0) {
                        try {
                            newSnapshot.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        newSnapshot.close();
                    }
                }
                TestTransaction testTransaction4 = new TestTransaction(this.env, "1", STAT_CHANGED);
                TOTAL_OBSERVER.process(testTransaction4, Bytes.of("1"), STAT_CHANGED);
                testTransaction4.done();
                Assert.assertEquals(0L, countNotifications());
                newSnapshot = client.newSnapshot();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals("2", newSnapshot.gets("all", STAT_TOTAL));
                    if (newSnapshot != null) {
                        if (0 == 0) {
                            newSnapshot.close();
                            return;
                        }
                        try {
                            newSnapshot.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    private int countNotifications() throws Exception {
        this.env.getSharedResources().getBatchWriter().waitForAsyncFlush();
        Scanner createScanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
        Notification.configureScanner(createScanner);
        int i = 0;
        Iterator it = createScanner.iterator();
        while (it.hasNext()) {
            it.next();
            i++;
        }
        return i;
    }
}
