package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.io.iceberg.RecordWriterManager;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.class */
public class RecordWriterManagerTest {

    @Rule
    public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default");

    @Rule
    public TestName testName = new TestName();
    private WindowedValue<IcebergDestination> windowedDestination;
    private HadoopCatalog catalog;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final Schema BEAM_SCHEMA = Schema.builder().addInt32Field("id").addStringField("name").addBooleanField("bool").build();
    private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
    private static final PartitionSpec PARTITION_SPEC = PartitionSpec.builderFor(ICEBERG_SCHEMA).truncate("name", 3).identity("bool").build();

    @Before
    public void setUp() {
        this.windowedDestination = getWindowedDestination("table_" + this.testName.getMethodName(), PARTITION_SPEC);
        this.catalog = new HadoopCatalog(new Configuration(), this.warehouse.location);
    }

    private WindowedValue<IcebergDestination> getWindowedDestination(String str, PartitionSpec partitionSpec) {
        TableIdentifier of = TableIdentifier.of(new String[]{"default", str});
        this.warehouse.createTable(of, ICEBERG_SCHEMA, partitionSpec);
        return WindowedValue.of(IcebergDestination.builder().setFileFormat(FileFormat.PARQUET).setTableIdentifier(of).build(), GlobalWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
    }

    @Test
    public void testCreateNewWriterForEachDestination() throws IOException {
        RecordWriterManager recordWriterManager = new RecordWriterManager(this.catalog, "test_file_name", 1000L, 3);
        Assert.assertEquals(0L, recordWriterManager.openWriters);
        WindowedValue<IcebergDestination> windowedDestination = getWindowedDestination("dest1", null);
        WindowedValue<IcebergDestination> windowedDestination2 = getWindowedDestination("dest2", null);
        WindowedValue<IcebergDestination> windowedDestination3 = getWindowedDestination("dest3", PARTITION_SPEC);
        WindowedValue<IcebergDestination> windowedDestination4 = getWindowedDestination("dest4", null);
        Assert.assertTrue(recordWriterManager.write(windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build()));
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(windowedDestination2, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build()));
        Assert.assertEquals(2L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(windowedDestination3, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build()));
        Assert.assertEquals(3L, recordWriterManager.openWriters);
        Assert.assertFalse(recordWriterManager.write(windowedDestination4, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build()));
        Assert.assertEquals(3L, recordWriterManager.openWriters);
        Assert.assertFalse(recordWriterManager.write(windowedDestination3, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", false}).build()));
        Assert.assertEquals(3L, recordWriterManager.openWriters);
        recordWriterManager.close();
        Assert.assertEquals(0L, recordWriterManager.openWriters);
        Assert.assertEquals(3L, recordWriterManager.getManifestFiles().keySet().size());
        MatcherAssert.assertThat(recordWriterManager.getManifestFiles().keySet(), Matchers.containsInAnyOrder(new WindowedValue[]{windowedDestination, windowedDestination2, windowedDestination3}));
    }

    @Test
    public void testCreateNewWriterForEachPartition() throws IOException {
        RecordWriterManager recordWriterManager = new RecordWriterManager(this.catalog, "test_file_name", 1000L, 3);
        Assert.assertEquals(0L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build()));
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{2, "bbb", false}).build()));
        Assert.assertEquals(2L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{3, "bbbaaa", false}).build()));
        Assert.assertEquals(2L, recordWriterManager.openWriters);
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{4, "bbb123", true}).build()));
        Assert.assertEquals(3L, recordWriterManager.openWriters);
        Assert.assertFalse(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{5, "aaa123", false}).build()));
        Assert.assertEquals(3L, recordWriterManager.openWriters);
        recordWriterManager.close();
        Assert.assertEquals(0L, recordWriterManager.openWriters);
        Assert.assertEquals(1L, recordWriterManager.getManifestFiles().size());
        ManifestFile manifestFile = (ManifestFile) Iterables.getOnlyElement((Iterable) recordWriterManager.getManifestFiles().get(this.windowedDestination));
        Assert.assertEquals(3L, manifestFile.addedFilesCount().intValue());
        Assert.assertEquals(4L, manifestFile.addedRowsCount().intValue());
    }

    @Test
    public void testRespectMaxFileSize() throws IOException {
        RecordWriterManager recordWriterManager = new RecordWriterManager(this.catalog, "test_file_name", 100L, 2);
        Assert.assertEquals(0L, recordWriterManager.openWriters);
        PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA);
        Row build = Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build();
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, build));
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, build));
        Map map = ((RecordWriterManager.DestinationState) recordWriterManager.destinations.get(this.windowedDestination)).writerCounts;
        Assert.assertEquals(1L, ((Integer) map.get(partitionKey)).intValue());
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{2, "aaa" + RandomStringUtils.randomAlphanumeric(1000), true}).build()));
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        Assert.assertEquals(1L, ((Integer) map.get(partitionKey)).intValue());
        Assert.assertTrue(recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{2, "aaabb", true}).build()));
        Assert.assertEquals(2L, ((Integer) map.get(partitionKey)).intValue());
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        recordWriterManager.close();
        Assert.assertEquals(0L, recordWriterManager.openWriters);
    }

    @Test
    public void testRequireClosingBeforeFetchingManifestFiles() {
        RecordWriterManager recordWriterManager = new RecordWriterManager(this.catalog, "test_file_name", 100L, 2);
        recordWriterManager.write(this.windowedDestination, Row.withSchema(BEAM_SCHEMA).addValues(new Object[]{1, "aaa", true}).build());
        Assert.assertEquals(1L, recordWriterManager.openWriters);
        Objects.requireNonNull(recordWriterManager);
        Assert.assertThrows(IllegalStateException.class, recordWriterManager::getManifestFiles);
    }
}
