package org.apache.accumulo.test.functional;

import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/test/functional/ScanIdIT.class */
public class ScanIdIT extends AccumuloClusterHarness {
    private static final int NUM_DATA_ROWS = 100;
    private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
    private static final Random random = new Random();
    private static final int NUM_SCANNERS = 8;
    private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
    private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
    private static final Map<Integer, Value> resultsByWorker = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/accumulo/test/functional/ScanIdIT$ScannerThread.class */
    private static class ScannerThread implements Runnable {
        private final Connector connector;
        private Scanner scanner = null;
        private final int workerIndex;
        private final String tablename;
        private final CountDownLatch latch;

        public ScannerThread(Connector connector, int i, String str, CountDownLatch countDownLatch) {
            this.connector = connector;
            this.workerIndex = i;
            this.tablename = str;
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.latch.countDown();
            try {
                this.latch.await();
                ScanIdIT.log.debug("Creating scanner in worker thread {}", Integer.valueOf(this.workerIndex));
                try {
                    this.scanner = this.connector.createScanner(this.tablename, new Authorizations());
                    this.scanner.setReadaheadThreshold(Long.MAX_VALUE);
                    this.scanner.setBatchSize(1);
                    this.scanner.setRange(new Range(new Text(Integer.toString(this.workerIndex)), new Text("9")));
                    this.scanner.fetchColumnFamily(new Text("fam1"));
                    for (Map.Entry entry : this.scanner) {
                        if (!ScanIdIT.testInProgress.get()) {
                            this.scanner.clearScanIterators();
                            this.scanner.close();
                            return;
                        }
                        ScanIdIT.log.debug("worker {}, row {}", Integer.valueOf(this.workerIndex), ((Key) entry.getKey()).getRow().toString());
                        if (entry.getValue() != null) {
                            Value value = (Value) ScanIdIT.resultsByWorker.put(Integer.valueOf(this.workerIndex), (Value) entry.getValue());
                            if (value != null) {
                                ScanIdIT.log.trace("worker {} values {}", Integer.valueOf(this.workerIndex), String.format("%1$s < %2$s", value, entry.getValue()));
                                Assert.assertTrue(value.compareTo(entry.getValue()) > 0);
                            }
                        } else {
                            ScanIdIT.log.info("Scanner returned null");
                            Assert.fail("Scanner returned unexpected null value");
                        }
                    }
                    ScanIdIT.log.debug("Scanner ran out of data. (info only, not an error) ");
                } catch (TableNotFoundException e) {
                    throw new IllegalStateException("Initialization failure. Could not create scanner", e);
                }
            } catch (InterruptedException e2) {
                ScanIdIT.log.error("Thread interrupted with id {}", Integer.valueOf(this.workerIndex));
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.apache.accumulo.harness.AccumuloITBase
    protected int defaultTimeoutSeconds() {
        return 60;
    }

    @Test
    public void testScanId() throws Exception {
        String str = getUniqueNames(1)[0];
        Connector connector = getConnector();
        connector.tableOperations().create(str);
        addSplits(connector, str);
        log.info("Splits added");
        generateSampleData(connector, str);
        log.info("Generated data for {}", str);
        attachSlowIterator(connector, str);
        CountDownLatch countDownLatch = new CountDownLatch(NUM_SCANNERS);
        for (int i = 0; i < NUM_SCANNERS; i++) {
            pool.submit(new ScannerThread(connector, i, str, countDownLatch));
        }
        while (testInProgress.get()) {
            if (resultsByWorker.size() < NUM_SCANNERS) {
                log.trace("Results reported {}", Integer.valueOf(resultsByWorker.size()));
                UtilWaitThread.sleepUninterruptibly(750L, TimeUnit.MILLISECONDS);
            } else {
                testInProgress.set(false);
                log.debug("Final result count {}", Integer.valueOf(resultsByWorker.size()));
                UtilWaitThread.sleepUninterruptibly(1L, TimeUnit.SECONDS);
            }
        }
        HashSet hashSet = new HashSet();
        List<String> tabletServers = connector.instanceOperations().getTabletServers();
        log.debug("tablet servers {}", tabletServers.toString());
        for (String str2 : tabletServers) {
            List<ActiveScan> list = null;
            for (int i2 = 0; i2 < 10; i2++) {
                try {
                    list = connector.instanceOperations().getActiveScans(str2);
                    break;
                } catch (AccumuloException e) {
                    if (!(e.getCause() instanceof TableNotFoundException)) {
                        throw e;
                    }
                    log.debug("Got TableNotFoundException, will retry");
                    Thread.sleep(200L);
                }
            }
            Assert.assertNotNull("Repeatedly got exception trying to active scans", list);
            log.debug("TServer {} has {} active scans", str2, Integer.valueOf(list.size()));
            for (ActiveScan activeScan : list) {
                log.debug("Tserver {} scan id {}", str2, Long.valueOf(activeScan.getScanid()));
                hashSet.add(Long.valueOf(activeScan.getScanid()));
            }
        }
        Assert.assertTrue("Expected at least 8 scanIds, but saw " + hashSet.size(), NUM_SCANNERS <= hashSet.size());
    }

    private void addSplits(Connector connector, String str) {
        try {
            connector.tableOperations().addSplits(str, createSplits());
            connector.tableOperations().offline(str, true);
            UtilWaitThread.sleepUninterruptibly(2L, TimeUnit.SECONDS);
            connector.tableOperations().online(str, true);
            Iterator it = connector.tableOperations().listSplits(str).iterator();
            while (it.hasNext()) {
                log.trace("Split {}", (Text) it.next());
            }
        } catch (AccumuloException e) {
            throw new IllegalStateException("Initialization failed. Could not add splits to " + str, e);
        } catch (AccumuloSecurityException e2) {
            throw new IllegalStateException("Initialization failed. Could not add splits to " + str, e2);
        } catch (TableNotFoundException e3) {
            throw new IllegalStateException("Initialization failed. Could not add splits to " + str, e3);
        }
    }

    private SortedSet<Text> createSplits() {
        TreeSet treeSet = new TreeSet();
        for (int i = 0; i < 10; i++) {
            treeSet.add(new Text(Integer.toString(i)));
        }
        return treeSet;
    }

    private void generateSampleData(Connector connector, String str) {
        try {
            BatchWriter createBatchWriter = connector.createBatchWriter(str, new BatchWriterConfig());
            ColumnVisibility columnVisibility = new ColumnVisibility("public");
            for (int i = 0; i < 100; i++) {
                Text text = new Text(String.format("%d", Integer.valueOf((random.nextInt(10) * 100) + i)));
                Mutation mutation = new Mutation(text);
                mutation.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(StandardCharsets.UTF_8)));
                mutation.put(new Text("fam1"), new Text("positive"), columnVisibility, new Value(Integer.toString(100 - i).getBytes(StandardCharsets.UTF_8)));
                mutation.put(new Text("fam1"), new Text("negative"), columnVisibility, new Value(Integer.toString(i - 100).getBytes(StandardCharsets.UTF_8)));
                log.trace("Added row {}", text);
                createBatchWriter.addMutation(mutation);
            }
            createBatchWriter.close();
        } catch (MutationsRejectedException e) {
            throw new IllegalStateException("Initialization failed. Could not create test data", e);
        } catch (TableNotFoundException e2) {
            throw new IllegalStateException("Initialization failed. Could not create test data", e2);
        }
    }

    private void attachSlowIterator(Connector connector, String str) {
        try {
            IteratorSetting iteratorSetting = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
            iteratorSetting.addOption("sleepTime", "200");
            iteratorSetting.addOption("seekSleepTime", "200");
            connector.tableOperations().attachIterator(str, iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.scan));
        } catch (AccumuloException e) {
            throw new IllegalStateException("Initialization failed. Could not attach slow iterator", e);
        } catch (TableNotFoundException e2) {
            throw new IllegalStateException("Initialization failed. Could not attach slow iterator", e2);
        } catch (AccumuloSecurityException e3) {
            throw new IllegalStateException("Initialization failed. Could not attach slow iterator", e3);
        }
    }
}
