package org.apache.paimon.mergetree;

import java.nio.file.Path;
import java.util.List;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.FailingConstructInputOutputIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/mergetree/MergeTreeReadersConnectionsLeakTest.class */
public class MergeTreeReadersConnectionsLeakTest {

    @TempDir
    Path tempDir;
    String dabaseName = "my_db";
    String tableName = "my_table";

    @RepeatedTest(20)
    public void testFailedStream() {
        FailingConstructInputOutputIO failingConstructInputOutputIO = new FailingConstructInputOutputIO(0.3d, MergeTreeReaders.class);
        try {
            FileSystemCatalog fileSystemCatalog = new FileSystemCatalog(failingConstructInputOutputIO, new org.apache.paimon.fs.Path(this.tempDir.toString()));
            createTable(fileSystemCatalog);
            Table table = getTable(fileSystemCatalog);
            writeData(table, 40);
            readData(table);
        } catch (Exception e) {
        }
        Assertions.assertThat(failingConstructInputOutputIO.noLeak()).isTrue();
    }

    private void createTable(Catalog catalog) throws Exception {
        catalog.createDatabase(this.dabaseName, true);
        catalog.createTable(new Identifier(this.dabaseName, this.tableName), schema(), true);
    }

    private Table getTable(Catalog catalog) throws Exception {
        return catalog.getTable(new Identifier(this.dabaseName, this.tableName));
    }

    private Schema schema() {
        Schema.Builder newBuilder = Schema.newBuilder();
        newBuilder.column("f0", DataTypes.INT());
        newBuilder.column("f1", DataTypes.STRING());
        newBuilder.column("f2", DataTypes.STRING());
        newBuilder.column("f3", DataTypes.STRING());
        newBuilder.primaryKey(new String[]{"f0", "f1", "f2"});
        newBuilder.partitionKeys(new String[]{"f0"});
        newBuilder.option("bucket", "40");
        newBuilder.option("bucket-key", "f1");
        newBuilder.option(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key(), "1000");
        return newBuilder.build();
    }

    private void writeData(Table table, int i) throws Exception {
        for (int i2 = 0; i2 < 5; i2++) {
            writeOnce(table, i);
        }
    }

    private void writeOnce(Table table, int i) throws Exception {
        BatchWriteBuilder newBatchWriteBuilder = table.newBatchWriteBuilder();
        BatchTableWrite newWrite = newBatchWriteBuilder.newWrite();
        for (int i2 = 0; i2 < i; i2++) {
            newWrite.write(GenericRow.of(new Object[]{0, BinaryString.fromString(String.valueOf(0)), BinaryString.fromString("aaaaaaaaaaaaa" + i2), BinaryString.fromString("b")}));
        }
        List prepareCommit = newWrite.prepareCommit();
        BatchTableCommit newCommit = newBatchWriteBuilder.newCommit();
        newCommit.commit(prepareCommit);
        newCommit.close();
    }

    private void readData(Table table) throws Exception {
        ReadBuilder newReadBuilder = table.newReadBuilder();
        newReadBuilder.newRead().createReader(newReadBuilder.newScan().plan()).forEachRemaining(internalRow -> {
        });
    }
}
