package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter;
import org.apache.pinot.spi.auth.AuthContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.class */
public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegrationTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentWriterUploaderIntegrationTest.class);
    private Schema _schema;
    private String _tableNameWithType;
    private List<File> _avroFiles;

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        this._schema = createSchema();
        addSchema(this._schema);
        this._tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName());
        this._avroFiles = getAllAvroFiles();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    @Nullable
    public IngestionConfig getIngestionConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("outputDirURI", this._tarDir.getAbsolutePath());
        hashMap.put("overwriteOutput", "false");
        hashMap.put("push.controllerUri", this._controllerBaseApiUrl);
        return new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(new Map[]{hashMap}), "APPEND", "HOURLY"), (StreamIngestionConfig) null, (FilterConfig) null, (List) null, (ComplexTypeConfig) null);
    }

    @Test
    public void testFileBasedSegmentWriterAndDefaultUploader() throws Exception {
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        FileBasedSegmentWriter fileBasedSegmentWriter = new FileBasedSegmentWriter();
        fileBasedSegmentWriter.init(createOfflineTableConfig, this._schema);
        SegmentUploaderDefault segmentUploaderDefault = new SegmentUploaderDefault();
        segmentUploaderDefault.init(createOfflineTableConfig);
        GenericRow genericRow = new GenericRow();
        long j = 0;
        for (int i = 0; i < 3; i++) {
            AvroRecordReader avroRecordReader = new AvroRecordReader();
            avroRecordReader.init(this._avroFiles.get(i), (Set) null, (RecordReaderConfig) null);
            long j2 = 0;
            while (avroRecordReader.hasNext()) {
                avroRecordReader.next(genericRow);
                fileBasedSegmentWriter.collect(genericRow);
                j2++;
                j++;
            }
            segmentUploaderDefault.uploadSegment(fileBasedSegmentWriter.flush(), (AuthContext) null);
            Assert.assertEquals(getNumSegments(), i + 1);
            Assert.assertEquals(getNumDocsInLatestSegment(), j2);
            checkTotalDocsInQuery(j);
        }
        fileBasedSegmentWriter.close();
        dropAllSegments(this._tableNameWithType, TableType.OFFLINE);
        checkNumSegments(0);
        segmentUploaderDefault.uploadSegmentsFromDir(this._tarDir.toURI(), (AuthContext) null);
        Assert.assertEquals(getNumSegments(), 3);
        checkTotalDocsInQuery(j);
        dropOfflineTable(this._tableNameWithType);
    }

    private int getNumSegments() throws IOException {
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPIWithTableType(this._tableNameWithType, TableType.OFFLINE.toString()))).get(0).get("OFFLINE").size();
    }

    private int getTotalDocsFromQuery() throws Exception {
        return postSqlQuery(String.format("select count(*) from %s", this._tableNameWithType), this._brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asInt();
    }

    private int getNumDocsInLatestSegment() throws IOException {
        JsonNode jsonNode = JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPIWithTableType(this._tableNameWithType, TableType.OFFLINE.toString()))).get(0).get("OFFLINE");
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentMetadata(this._tableNameWithType, jsonNode.get(jsonNode.size() - 1).asText()))).get("segment.total.docs").asInt();
    }

    private void checkTotalDocsInQuery(final long j) {
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentWriterUploaderIntegrationTest.1
            @Nullable
            public Boolean apply(@Nullable Void r6) {
                try {
                    return Boolean.valueOf(((long) SegmentWriterUploaderIntegrationTest.this.getTotalDocsFromQuery()) == j);
                } catch (Exception e) {
                    SegmentWriterUploaderIntegrationTest.LOGGER.error("Caught exception when getting totalDocs from query: {}", e.getMessage());
                    return null;
                }
            }
        }, 100L, 120000L, "Failed to load " + j + " documents", true);
    }

    private void checkNumSegments(final int i) {
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentWriterUploaderIntegrationTest.2
            @Nullable
            public Boolean apply(@Nullable Void r5) {
                try {
                    return Boolean.valueOf(SegmentWriterUploaderIntegrationTest.this.getNumSegments() == i);
                } catch (Exception e) {
                    SegmentWriterUploaderIntegrationTest.LOGGER.error("Caught exception when getting num segments: {}", e.getMessage());
                    return null;
                }
            }
        }, 100L, 120000L, "Failed to load get num segments", true);
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
