package org.apache.pinot.integration.tests;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.class */
public class UploadRefreshDeleteIntegrationTest extends BaseClusterIntegrationTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(UploadRefreshDeleteIntegrationTest.class);
    private String _tableName;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    @Nonnull
    public String getTableName() {
        return this._tableName;
    }

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        startBroker();
        startServer();
    }

    @BeforeMethod
    public void setupMethod(Object[] objArr) throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        if (objArr == null || objArr.length == 0) {
            return;
        }
        this._tableName = (String) objArr[0];
        addOfflineTable(this._tableName, (SegmentVersion) objArr[1]);
    }

    @AfterMethod
    public void teardownMethod() throws Exception {
        if (this._tableName != null) {
            dropOfflineTable(this._tableName);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void generateAndUploadRandomSegment(String str, int i) throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        Schema parse = new Schema.Parser().parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
        GenericData.Record record = new GenericData.Record(parse);
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(parse));
        File file = new File(this._tempDir, str + ".avro");
        dataFileWriter.create(parse, file);
        for (int i2 = 0; i2 < i; i2++) {
            record.put(0, Integer.valueOf(current.nextInt()));
            dataFileWriter.append(record);
        }
        dataFileWriter.close();
        int parseInt = Integer.parseInt(str.split("_")[1]);
        File file2 = new File(this._tarDir, str);
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{file2});
        ListeningExecutorService newDirectExecutorService = MoreExecutors.newDirectExecutorService();
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(Collections.singletonList(file), parseInt, new File(this._segmentDir, str), file2, this._tableName, newDirectExecutorService);
        newDirectExecutorService.shutdown();
        newDirectExecutorService.awaitTermination(1L, TimeUnit.MINUTES);
        uploadSegments(file2);
        FileUtils.forceDelete(file);
        FileUtils.forceDelete(file2);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "configProvider")
    public Object[][] configProvider() {
        return new Object[]{new Object[]{"mytable", SegmentVersion.v1}, new Object[]{"yourtable", SegmentVersion.v3}};
    }

    @Test(dataProvider = "configProvider")
    public void testRefresh(String str, SegmentVersion segmentVersion) throws Exception {
        generateAndUploadRandomSegment("segmentToBeRefreshed_6", 69);
        verifyNRows(0, 69);
        LOGGER.info("Segment {} loaded with {} rows, refreshing with {}", new Object[]{"segmentToBeRefreshed_6", 69, 198});
        generateAndUploadRandomSegment("segmentToBeRefreshed_6", 198);
        verifyNRows(69, 198);
        generateAndUploadRandomSegment("newSegment_9", 102);
        verifyNRows(198, 300);
    }

    private void verifyNRows(int i, int i2) throws Exception {
        long j;
        int i3 = 0;
        long j2 = 100;
        while (i3 < 10) {
            Thread.sleep(j2);
            try {
                j = getCurrentCountStarResult();
            } catch (Exception e) {
                j = -1;
            }
            if (j == i || j == -1) {
                j2 *= 2;
                i3++;
            } else if (j == i2) {
                return;
            } else {
                Assert.fail("Found unexpected number of rows " + j);
            }
        }
        Assert.fail("Failed to get from " + i + " to " + i2);
    }

    @Test(enabled = false, dataProvider = "configProvider")
    public void testUploadRefreshDelete(String str, SegmentVersion segmentVersion) throws Exception {
        final String[] strArr = new String[5];
        final int[] iArr = new int[5];
        for (int i = 0; i < 5; i++) {
            strArr[i] = "segment_" + i;
            iArr[i] = 0;
        }
        for (int i2 = 0; i2 < 5; i2++) {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            for (int i3 = 0; i3 < 10; i3++) {
                newFixedThreadPool.submit(new Runnable() { // from class: org.apache.pinot.integration.tests.UploadRefreshDeleteIntegrationTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ThreadLocalRandom current = ThreadLocalRandom.current();
                            int nextInt = current.nextInt(5);
                            String str2 = strArr[nextInt];
                            if (current.nextDouble() < 0.8d) {
                                UploadRefreshDeleteIntegrationTest.LOGGER.info("Will upload segment {}", str2);
                                synchronized (str2) {
                                    int nextInt2 = current.nextInt(500, 1000);
                                    UploadRefreshDeleteIntegrationTest.LOGGER.info("Generating and uploading segment {} with {} rows", str2, Integer.valueOf(nextInt2));
                                    UploadRefreshDeleteIntegrationTest.this.generateAndUploadRandomSegment(str2, nextInt2);
                                    UploadRefreshDeleteIntegrationTest.LOGGER.info("Uploaded segment {} with {} rows", str2, Integer.valueOf(nextInt2));
                                    iArr[nextInt] = nextInt2;
                                }
                            }
                            UploadRefreshDeleteIntegrationTest.LOGGER.info("Will delete segment {}", str2);
                            synchronized (str2) {
                                UploadRefreshDeleteIntegrationTest.LOGGER.info("Deleting segment {}", str2);
                                UploadRefreshDeleteIntegrationTest.LOGGER.info("Deletion returned {}", ControllerTest.sendDeleteRequest(UploadRefreshDeleteIntegrationTest.this._controllerRequestURLBuilder.forSegmentDelete("myresource", str2)));
                                UploadRefreshDeleteIntegrationTest.LOGGER.info("Deleted segment {}", str2);
                                iArr[nextInt] = 0;
                            }
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
            }
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
            int i4 = 0;
            for (int i5 : iArr) {
                i4 += i5;
            }
            LOGGER.info("Awaiting for the row count to match {}", Integer.valueOf(i4));
            int currentCountStarResult = (int) getCurrentCountStarResult();
            long currentTimeMillis = System.currentTimeMillis() + 60000;
            while (System.currentTimeMillis() < currentTimeMillis && currentCountStarResult != i4) {
                LOGGER.info("Row count is {}, expected {}, awaiting for row count to match", Integer.valueOf(currentCountStarResult), Integer.valueOf(i4));
                Thread.sleep(5000L);
                try {
                    currentCountStarResult = (int) getCurrentCountStarResult();
                } catch (Exception e) {
                    LOGGER.warn("Caught exception while sending query to Pinot, retrying", e);
                }
            }
            Assert.assertEquals(currentCountStarResult, i4, "Expected and actual row counts don't match after waiting one minute");
        }
    }

    @AfterClass
    public void tearDown() {
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteQuietly(this._tempDir);
    }
}
