package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.class */
public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_SERVERS = 2;
    private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
    private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
    private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
    private static final String PRIMARY_KEY_COL = "clientId";
    private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME";

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBrokers(getNumBrokers());
        startServers(2);
        startKafka();
        Schema createSchema = createSchema();
        addSchema(createSchema);
        List<File> unpackAvroData = unpackAvroData(this._tempDir);
        pushAvroIntoKafka(unpackAvroData);
        TableConfig createUpsertTableConfig = createUpsertTableConfig(unpackAvroData.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
        addTableConfig(createUpsertTableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createUpsertTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), TableType.REALTIME, this._tarDir);
        waitForAllDocsLoaded(600000L);
    }

    @AfterClass
    public void tearDown() throws IOException {
        dropRealtimeTable(getTableName());
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected String getSchemaFileName() {
        return "upsert_table_test.schema";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getSchemaName() {
        return "upsertSchema";
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected String getAvroTarFileName() {
        return "upsert_test.tar.gz";
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected boolean useLlc() {
        return true;
    }

    protected int getNumBrokers() {
        return NUM_BROKERS;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public long getCountStarResult() {
        return 3L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    protected void startController() throws Exception {
        Map defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put("controller.realtime.segment.validation.frequencyInSeconds", Integer.valueOf(NUM_BROKERS));
        defaultControllerConfiguration.put("controller.segment.level.validation.intervalInSeconds", Integer.valueOf(NUM_BROKERS));
        defaultControllerConfiguration.put("controller.realtime.segment.validation.initialDelayInSeconds", Integer.valueOf(NUM_BROKERS));
        startController(defaultControllerConfiguration);
    }

    @Test
    public void testSegmentAssignment() throws Exception {
        IdealState tableIdealState = HelixHelper.getTableIdealState(this._helixManager, TABLE_NAME_WITH_TYPE);
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
        verifyTableIdealStates(tableIdealState);
        Thread.sleep(3000L);
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
        verifyTableIdealStates(tableIdealState);
    }

    private void verifyTableIdealStates(IdealState idealState) {
        Set<String> partitionSet = idealState.getPartitionSet();
        Assert.assertEquals(partitionSet.size(), 5);
        HashMap hashMap = new HashMap();
        hashMap.put(UPLOADED_SEGMENT_1, 0);
        hashMap.put(UPLOADED_SEGMENT_2, Integer.valueOf(NUM_BROKERS));
        hashMap.put(UPLOADED_SEGMENT_3, Integer.valueOf(NUM_BROKERS));
        HashMap hashMap2 = new HashMap();
        for (String str : partitionSet) {
            Integer valueOf = LLCSegmentName.isLowLevelConsumerSegmentName(str) ? Integer.valueOf(new LLCSegmentName(str).getPartitionGroupId()) : (Integer) hashMap.get(str);
            Assert.assertNotNull(valueOf);
            Set instanceSet = idealState.getInstanceSet(str);
            Assert.assertEquals(NUM_BROKERS, instanceSet.size());
            if (hashMap2.containsKey(valueOf)) {
                Assert.assertEquals(instanceSet, (Set) hashMap2.get(valueOf));
            } else {
                hashMap2.put(valueOf, instanceSet);
            }
        }
    }

    private void uploadSegments(String str, TableType tableType, File file) throws Exception {
        File[] listFiles = file.listFiles();
        Assert.assertNotNull(listFiles);
        int length = listFiles.length;
        Assert.assertTrue(length > 0);
        URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI("localhost", this._controllerPort);
        FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
        try {
            if (length == NUM_BROKERS) {
                File file2 = listFiles[0];
                Assert.assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, file2.getName(), file2, str, tableType).getStatusCode(), 200);
            } else {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(length);
                ArrayList arrayList = new ArrayList(length);
                int length2 = listFiles.length;
                for (int i = 0; i < length2; i += NUM_BROKERS) {
                    File file3 = listFiles[i];
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        return Integer.valueOf(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, file3.getName(), file3, str, tableType).getStatusCode());
                    }));
                }
                newFixedThreadPool.shutdown();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((Integer) ((Future) it.next()).get()).intValue(), 200);
                }
            }
            fileUploadDownloadClient.close();
        } catch (Throwable th) {
            try {
                fileUploadDownloadClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
