package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kudu.Schema;
import org.apache.kudu.WireProtocol;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.tserver.Tserver;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/kudu/client/TestAsyncKuduSession.class */
public class TestAsyncKuduSession extends BaseKuduTest {
    private static final String TABLE_NAME = TestAsyncKuduSession.class.getName() + "-" + System.currentTimeMillis();
    private static Schema schema = getBasicSchema();
    private static KuduTable table;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        BaseKuduTest.setUpBeforeClass();
        table = createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
    }

    @Test(timeout = 100000)
    public void testBackgroundErrors() throws Exception {
        try {
            AsyncKuduSession newSession = client.newSession();
            newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
            newSession.setFlushInterval(10);
            Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 0);
            try {
                OperationResponse operationResponse = (OperationResponse) newSession.apply(createInsert(1)).join(50000L);
                Assert.assertTrue(operationResponse.hasRowError());
                Assert.assertTrue(operationResponse.getRowError().getErrorStatus().getMessage().contains(getTabletServerErrorMessage()));
            } catch (Exception e) {
                Assert.fail("Should not throw");
            }
            Assert.assertEquals(1L, newSession.countPendingErrors());
            Batch.injectTabletServerErrorAndLatency((Tserver.TabletServerErrorPB) null, 0);
        } catch (Throwable th) {
            Batch.injectTabletServerErrorAndLatency((Tserver.TabletServerErrorPB) null, 0);
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testBatchErrorCauseSessionStuck() throws Exception {
        try {
            AsyncKuduSession newSession = client.newSession();
            newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
            newSession.setFlushInterval(100);
            Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 200);
            Deferred apply = newSession.apply(createInsert(1));
            Thread.sleep(120L);
            Deferred apply2 = newSession.apply(createInsert(2));
            try {
                OperationResponse operationResponse = (OperationResponse) apply.join(50000L);
                Assert.assertTrue(operationResponse.hasRowError());
                Assert.assertTrue(operationResponse.getRowError().getErrorStatus().getMessage().contains(getTabletServerErrorMessage()));
            } catch (Exception e) {
                Assert.fail("Should not throw");
            }
            try {
                OperationResponse operationResponse2 = (OperationResponse) apply2.join(50000L);
                Assert.assertTrue(operationResponse2.hasRowError());
                Assert.assertTrue(operationResponse2.getRowError().getErrorStatus().getMessage().contains(getTabletServerErrorMessage()));
            } catch (Exception e2) {
                Assert.fail("Should not throw");
            }
            Assert.assertFalse(newSession.hasPendingOperations());
            Batch.injectTabletServerErrorAndLatency((Tserver.TabletServerErrorPB) null, 0);
        } catch (Throwable th) {
            Batch.injectTabletServerErrorAndLatency((Tserver.TabletServerErrorPB) null, 0);
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testGetTableLocationsErrorCauseSessionStuck() throws Exception {
        AsyncKuduSession newSession = client.newSession();
        Insert createInsert = createInsert(1);
        newSession.apply(createInsert).join(50000L);
        RemoteTablet tablet = client.getTableLocationEntry(table.getTableId(), createInsert.partitionKey()).getTablet();
        String tabletId = tablet.getTabletId();
        TabletClient tabletClient = client.getTabletClient(tablet.getLeaderUUID());
        try {
            client.deleteTable(TABLE_NAME).join();
            while (true) {
                ListTabletsRequest listTabletsRequest = new ListTabletsRequest();
                tabletClient.sendRpc(listTabletsRequest);
                if (!((ListTabletsResponse) listTabletsRequest.getDeferred().join()).getTabletsList().contains(tabletId)) {
                    OperationResponse operationResponse = (OperationResponse) newSession.apply(createInsert(1)).join(50000L);
                    Assert.assertTrue(operationResponse.hasRowError());
                    Assert.assertTrue(operationResponse.getRowError().getErrorStatus().isNotFound());
                    table = createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
                    return;
                }
                Thread.sleep(100L);
            }
        } catch (Throwable th) {
            table = createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
            throw th;
        }
    }

    @Test
    public void testInsertIntoUnavailableTablet() throws Exception {
        killTabletServers();
        try {
            AsyncKuduSession newSession = client.newSession();
            newSession.setTimeoutMillis(1L);
            OperationResponse operationResponse = (OperationResponse) newSession.apply(createInsert(1)).join();
            Assert.assertTrue(operationResponse.hasRowError());
            Assert.assertTrue(operationResponse.getRowError().getErrorStatus().isTimedOut());
            newSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            newSession.apply(createInsert(1));
            List list = (List) newSession.flush().join();
            Assert.assertEquals(1L, list.size());
            Assert.assertTrue(((OperationResponse) list.get(0)).getRowError().getErrorStatus().isTimedOut());
            restartTabletServers();
        } catch (Throwable th) {
            restartTabletServers();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testRestartBetweenWrites() throws Exception {
        KuduTable createTable = createTable("non-replicated", basicSchema, getBasicCreateTableOptions().setNumReplicas(1));
        try {
            AsyncKuduSession newSession = client.newSession();
            newSession.setTimeoutMillis(30000L);
            newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
            newSession.apply(createBasicSchemaInsert(createTable, 1)).join();
            int size = client.getTabletClients().size();
            killTabletServers();
            restartTabletServers();
            newSession.apply(createBasicSchemaInsert(createTable, 2)).join();
            Assert.assertEquals(size, client.getTabletClients().size());
            restartTabletServers();
            client.deleteTable("non-replicated").join();
        } catch (Throwable th) {
            restartTabletServers();
            client.deleteTable("non-replicated").join();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void test() throws Exception {
        AsyncKuduSession newSession = client.newSession();
        newSession.setMutationBufferLowWatermark(1.0f);
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        newSession.setFlushInterval(51000);
        newSession.apply(createInsert(0));
        newSession.flush().join(50000L);
        Assert.assertTrue(exists(0));
        newSession.setFlushInterval(1000);
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
        for (int i = 1; i < 10; i++) {
            newSession.apply(createInsert(i)).join(50000L);
        }
        Assert.assertEquals(10L, countInRange(0, 10));
        newSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        newSession.setMutationBufferSpace(10);
        newSession.apply(createInsert(10));
        try {
            newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
        } catch (IllegalArgumentException e) {
        }
        Assert.assertFalse(exists(10));
        for (int i2 = 11; i2 < 20; i2++) {
            newSession.apply(createInsert(i2));
        }
        Assert.assertEquals(0L, countInRange(10, 20));
        try {
            newSession.apply(createInsert(20));
        } catch (KuduException e2) {
        }
        Assert.assertEquals(0L, countInRange(10, 20));
        newSession.flush().join(50000L);
        Assert.assertEquals(10L, countInRange(10, 20));
        newSession.flush().join(50000L);
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        Deferred apply = newSession.apply(createInsert(20));
        Thread.sleep(50L);
        Assert.assertFalse(exists(20));
        for (int i3 = 21; i3 < 30; i3++) {
            apply = newSession.apply(createInsert(i3));
        }
        Deferred apply2 = newSession.apply(createInsert(30));
        long currentTimeMillis = System.currentTimeMillis();
        apply.join(50000L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 950);
        Assert.assertEquals(10L, countInRange(20, 31));
        apply2.join();
        Assert.assertEquals(11L, countInRange(20, 31));
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
        Update createUpdate = createUpdate(30);
        PartialRow row = createUpdate.getRow();
        row.addInt(2, 999);
        row.addString(3, "updated data");
        Deferred apply3 = newSession.apply(createUpdate);
        apply3.addErrback(defaultErrorCB);
        apply3.join(50000L);
        Assert.assertEquals(31L, countInRange(0, 31));
        Deferred apply4 = newSession.apply(createDelete(30));
        apply4.addErrback(defaultErrorCB);
        apply4.join(50000L);
        Assert.assertEquals(30L, countInRange(0, 31));
        newSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        newSession.setMutationBufferSpace(35);
        for (int i4 = 0; i4 < 20; i4++) {
            apply2 = newSession.apply(createDelete(i4));
        }
        Assert.assertEquals(30L, countInRange(0, 31));
        newSession.flush();
        apply2.join(50000L);
        Assert.assertEquals(10L, countInRange(0, 31));
        for (int i5 = 30; i5 < 40; i5++) {
            newSession.apply(createInsert(i5));
        }
        for (int i6 = 20; i6 < 30; i6++) {
            apply2 = newSession.apply(createDelete(i6));
        }
        Assert.assertEquals(10L, countInRange(0, 40));
        newSession.flush();
        apply2.join(50000L);
        Assert.assertEquals(10L, countInRange(0, 40));
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
        for (int i7 = 40; i7 < 50; i7++) {
            newSession.apply(createInsertWithNull(i7)).join(50000L);
        }
        Assert.assertEquals(10L, countNullColumns(40, 50));
        newSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        newSession.setMutationBufferSpace(10);
        boolean z = false;
        for (int i8 = 50; i8 < 71; i8++) {
            try {
                newSession.apply(createInsert(i8));
            } catch (PleaseThrottleException e3) {
                z = true;
                Assert.assertEquals(70L, i8);
                e3.getDeferred().join(50000L);
                newSession.apply(e3.getFailedRpc());
                newSession.flush().join(50000L);
            }
        }
        Assert.assertTrue("Expected PleaseThrottleException", z);
        Assert.assertEquals(21L, countInRange(50, 71));
        for (int i9 = 71; i9 < 91; i9++) {
            newSession.apply(createInsert(i9));
        }
        newSession.flush().join(50000L);
        Assert.assertEquals(20L, countInRange(71, 91));
        Assert.assertEquals(20L, countRowsInScan(getScanner(71, 91, Collections.emptyList())));
        ((TabletClient) client.getTabletClients().get(0)).shutdown().join(50000L);
        newSession.setMutationBufferSpace(1);
        for (int i10 = 91; i10 < 101; i10++) {
            try {
                newSession.apply(createInsert(i10));
            } catch (PleaseThrottleException e4) {
                e4.getDeferred().join(50000L);
                newSession.apply(e4.getFailedRpc());
            }
        }
        newSession.flush().join(50000L);
        Assert.assertEquals(10L, countInRange(91, 101));
        client.emptyTabletsCacheForTable(table.getTableId());
        for (int i11 = 101; i11 < 151; i11++) {
            Insert createInsert = createInsert(i11);
            while (true) {
                try {
                    newSession.apply(createInsert);
                    break;
                } catch (PleaseThrottleException e5) {
                    e5.getDeferred().join(50000L);
                }
            }
        }
        newSession.flush().join(50000L);
        Assert.assertEquals(50L, countInRange(101, 151));
        newSession.setMutationBufferLowWatermark(0.1f);
        newSession.setMutationBufferSpace(10);
        newSession.setRandomSeed(12345L);
        boolean z2 = false;
        for (int i12 = 151; i12 < 171; i12++) {
            try {
                newSession.apply(createInsert(i12));
            } catch (PleaseThrottleException e6) {
                Assert.assertEquals(167L, i12);
                z2 = true;
                Assert.assertTrue(e6.getMessage().contains("watermark"));
                e6.getDeferred().join(50000L);
                newSession.apply(e6.getFailedRpc());
            }
        }
        newSession.flush().join(50000L);
        Assert.assertEquals(20L, countInRange(151, 171));
        Assert.assertTrue(z2);
    }

    private Insert createInsert(int i) {
        return createBasicSchemaInsert(table, i);
    }

    private Insert createInsertWithNull(int i) {
        Insert newInsert = table.newInsert();
        PartialRow row = newInsert.getRow();
        row.addInt(0, i);
        row.addInt(1, 2);
        row.addInt(2, 3);
        row.setNull(3);
        row.addBoolean(4, false);
        return newInsert;
    }

    private Update createUpdate(int i) {
        Update newUpdate = table.newUpdate();
        newUpdate.getRow().addInt(0, i);
        return newUpdate;
    }

    private Delete createDelete(int i) {
        Delete newDelete = table.newDelete();
        newDelete.getRow().addInt(0, i);
        return newDelete;
    }

    public static boolean exists(final int i) throws Exception {
        AsyncKuduScanner scanner = getScanner(i, i + 1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Callback<Object, RowResultIterator> callback = new Callback<Object, RowResultIterator>() { // from class: org.apache.kudu.client.TestAsyncKuduSession.1
            public Object call(RowResultIterator rowResultIterator) throws Exception {
                if (rowResultIterator == null) {
                    return null;
                }
                Iterator it = rowResultIterator.iterator();
                while (it.hasNext()) {
                    if (((RowResult) it.next()).getInt(0) == i) {
                        atomicBoolean.set(true);
                        return null;
                    }
                }
                return null;
            }
        };
        while (scanner.hasMoreRows()) {
            Deferred nextRows = scanner.nextRows();
            nextRows.addCallbacks(callback, defaultErrorCB);
            nextRows.join(50000L);
            if (atomicBoolean.get()) {
                break;
            }
        }
        scanner.close().join(50000L);
        return atomicBoolean.get();
    }

    public static int countNullColumns(int i, int i2) throws Exception {
        AsyncKuduScanner scanner = getScanner(i, i2);
        final AtomicInteger atomicInteger = new AtomicInteger();
        Callback<Object, RowResultIterator> callback = new Callback<Object, RowResultIterator>() { // from class: org.apache.kudu.client.TestAsyncKuduSession.2
            public Object call(RowResultIterator rowResultIterator) throws Exception {
                if (rowResultIterator == null) {
                    return null;
                }
                Iterator it = rowResultIterator.iterator();
                while (it.hasNext()) {
                    if (((RowResult) it.next()).isNull(3)) {
                        atomicInteger.incrementAndGet();
                    }
                }
                return null;
            }
        };
        while (scanner.hasMoreRows()) {
            Deferred nextRows = scanner.nextRows();
            nextRows.addCallbacks(callback, defaultErrorCB);
            nextRows.join(50000L);
        }
        scanner.close().join(50000L);
        return atomicInteger.get();
    }

    public static int countInRange(int i, int i2) throws Exception {
        return countRowsInScan(getScanner(i, i2));
    }

    private static AsyncKuduScanner getScanner(int i, int i2) {
        return getScanner(i, i2, null);
    }

    private static AsyncKuduScanner getScanner(int i, int i2, List<String> list) {
        PartialRow newPartialRow = schema.newPartialRow();
        newPartialRow.addInt(schema.getColumnByIndex(0).getName(), i);
        PartialRow newPartialRow2 = schema.newPartialRow();
        newPartialRow2.addInt(schema.getColumnByIndex(0).getName(), i2);
        return client.newScannerBuilder(table).lowerBound(newPartialRow).exclusiveUpperBound(newPartialRow2).setProjectedColumnNames(list).build();
    }

    private Tserver.TabletServerErrorPB makeTabletServerError() {
        return Tserver.TabletServerErrorPB.newBuilder().setCode(Tserver.TabletServerErrorPB.Code.UNKNOWN_ERROR).setStatus(WireProtocol.AppStatusPB.newBuilder().setCode(WireProtocol.AppStatusPB.ErrorCode.UNKNOWN_ERROR).setMessage(getTabletServerErrorMessage()).build()).build();
    }

    private String getTabletServerErrorMessage() {
        return "injected error for test";
    }
}
