package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/sink/TestIcebergStreamWriter.class */
public class TestIcebergStreamWriter {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private Table table;
    private final FileFormat format;
    private final boolean partitioned;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"avro", true}, new Object[]{"avro", false}, new Object[]{"orc", true}, new Object[]{"orc", false}, new Object[]{"parquet", true}, new Object[]{"parquet", false}};
    }

    public TestIcebergStreamWriter(String str, boolean z) {
        this.format = FileFormat.fromString(str);
        this.partitioned = z;
    }

    @Before
    public void before() throws IOException {
        File newFolder = this.tempFolder.newFolder();
        this.table = SimpleDataUtil.createTable(newFolder.getAbsolutePath(), ImmutableMap.of("write.format.default", this.format.name()), this.partitioned);
    }

    @Test
    public void testWritingTable() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter();
        Throwable th = null;
        try {
            try {
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(1, "hello"), 1L);
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(2, "world"), 1L);
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(3, "hello"), 1L);
                createIcebergStreamWriter.prepareSnapshotPreBarrier(1L);
                long j = this.partitioned ? 2L : 1L;
                WriteResult build = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
                Assert.assertEquals(0L, build.deleteFiles().length);
                Assert.assertEquals(j, build.dataFiles().length);
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(4, "foo"), 1L);
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(5, "bar"), 2L);
                createIcebergStreamWriter.prepareSnapshotPreBarrier(1 + 1);
                long j2 = this.partitioned ? 4L : 2L;
                WriteResult build2 = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
                Assert.assertEquals(0L, build2.deleteFiles().length);
                Assert.assertEquals(j2, build2.dataFiles().length);
                AppendFiles newAppend = this.table.newAppend();
                Stream stream = Arrays.stream(build2.dataFiles());
                Objects.requireNonNull(newAppend);
                stream.forEach(newAppend::appendFile);
                newAppend.commit();
                SimpleDataUtil.assertTableRecords(this.table, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"), SimpleDataUtil.createRecord(3, "hello"), SimpleDataUtil.createRecord(4, "foo"), SimpleDataUtil.createRecord(5, "bar")}));
                if (createIcebergStreamWriter != null) {
                    $closeResource(null, createIcebergStreamWriter);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createIcebergStreamWriter != null) {
                $closeResource(th, createIcebergStreamWriter);
            }
            throw th3;
        }
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [long, java.lang.AutoCloseable, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testSnapshotTwice() throws Exception {
        ?? createIcebergStreamWriter = createIcebergStreamWriter();
        Throwable th = null;
        try {
            try {
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(1, "hello"), 1L);
                createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(2, "world"), 1 + 1);
                long j = createIcebergStreamWriter + 1;
                createIcebergStreamWriter.prepareSnapshotPreBarrier(1L);
                long j2 = this.partitioned ? 2L : 1L;
                WriteResult build = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
                Assert.assertEquals(0L, build.deleteFiles().length);
                Assert.assertEquals(j2, build.dataFiles().length);
                for (int i = 0; i < 5; i++) {
                    j++;
                    createIcebergStreamWriter.prepareSnapshotPreBarrier((long) createIcebergStreamWriter);
                    WriteResult build2 = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
                    Assert.assertEquals(0L, build2.deleteFiles().length);
                    Assert.assertEquals(j2, build2.dataFiles().length);
                }
                if (createIcebergStreamWriter != 0) {
                    $closeResource(null, createIcebergStreamWriter);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createIcebergStreamWriter != 0) {
                $closeResource(th, createIcebergStreamWriter);
            }
            throw th3;
        }
    }

    @Test
    public void testTableWithoutSnapshot() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter();
        try {
            Assert.assertEquals(0L, createIcebergStreamWriter.extractOutputValues().size());
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            Assert.assertEquals(0L, scanDataFiles().size());
            OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter2 = createIcebergStreamWriter();
            try {
                createIcebergStreamWriter2.processElement(SimpleDataUtil.createRowData(1, "hello"), 1L);
                Assert.assertEquals(0L, createIcebergStreamWriter2.extractOutputValues().size());
                if (createIcebergStreamWriter2 != null) {
                    $closeResource(null, createIcebergStreamWriter2);
                }
                Assert.assertEquals(1L, scanDataFiles().size());
            } catch (Throwable th) {
                if (createIcebergStreamWriter2 != null) {
                    $closeResource(null, createIcebergStreamWriter2);
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            throw th2;
        }
    }

    private Set<String> scanDataFiles() throws IOException {
        Path path = new Path(this.table.location(), "data");
        FileSystem fileSystem = FileSystem.get(new Configuration());
        if (!fileSystem.exists(path)) {
            return ImmutableSet.of();
        }
        HashSet newHashSet = Sets.newHashSet();
        RemoteIterator listFiles = fileSystem.listFiles(path, true);
        while (listFiles.hasNext()) {
            LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
            if (locatedFileStatus.isFile()) {
                Path path2 = locatedFileStatus.getPath();
                if (path2.getName().endsWith("." + this.format.toString().toLowerCase())) {
                    newHashSet.add(path2.toString());
                }
            }
        }
        return newHashSet;
    }

    @Test
    public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter();
        try {
            createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(1, "hello"), 1L);
            createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(2, "world"), 2L);
            Assertions.assertThat(createIcebergStreamWriter.getOneInputOperator()).isInstanceOf(BoundedOneInput.class);
            createIcebergStreamWriter.getOneInputOperator().endInput();
            long j = this.partitioned ? 2L : 1L;
            WriteResult build = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
            Assert.assertEquals(0L, build.deleteFiles().length);
            Assert.assertEquals(j, build.dataFiles().length);
            createIcebergStreamWriter.getOneInputOperator().endInput();
            WriteResult build2 = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
            Assert.assertEquals(0L, build2.deleteFiles().length);
            Assert.assertEquals(j, build2.dataFiles().length);
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
        } catch (Throwable th) {
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            throw th;
        }
    }

    @Test
    public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter();
        try {
            createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(1, "hello"), 1L);
            createIcebergStreamWriter.processElement(SimpleDataUtil.createRowData(2, "world"), 2L);
            createIcebergStreamWriter.endInput();
            long j = this.partitioned ? 2L : 1L;
            WriteResult build = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
            Assert.assertEquals(0L, build.deleteFiles().length);
            Assert.assertEquals(j, build.dataFiles().length);
            createIcebergStreamWriter.prepareSnapshotPreBarrier(1L);
            WriteResult build2 = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
            Assert.assertEquals(0L, build2.deleteFiles().length);
            Assert.assertEquals(j, build2.dataFiles().length);
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
        } catch (Throwable th) {
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            throw th;
        }
    }

    @Test
    public void testTableWithTargetFileSize() throws Exception {
        this.table.updateProperties().set("write.target-file-size-bytes", "4").commit();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(8000);
        ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(8000);
        for (int i = 0; i < 2000; i++) {
            for (String str : new String[]{"a", "b", "c", "d"}) {
                newArrayListWithCapacity.add(SimpleDataUtil.createRowData(Integer.valueOf(i), str));
                newArrayListWithCapacity2.add(SimpleDataUtil.createRecord(Integer.valueOf(i), str));
            }
        }
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter();
        Throwable th = null;
        try {
            try {
                Iterator it = newArrayListWithCapacity.iterator();
                while (it.hasNext()) {
                    createIcebergStreamWriter.processElement((RowData) it.next(), 1L);
                }
                createIcebergStreamWriter.prepareSnapshotPreBarrier(1L);
                WriteResult build = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
                Assert.assertEquals(0L, build.deleteFiles().length);
                Assert.assertEquals(8L, build.dataFiles().length);
                for (DataFile dataFile : build.dataFiles()) {
                    Assert.assertEquals(1000L, dataFile.recordCount());
                }
                AppendFiles newAppend = this.table.newAppend();
                Stream stream = Arrays.stream(build.dataFiles());
                Objects.requireNonNull(newAppend);
                stream.forEach(newAppend::appendFile);
                newAppend.commit();
                if (createIcebergStreamWriter != null) {
                    $closeResource(null, createIcebergStreamWriter);
                }
                SimpleDataUtil.assertTableRecords(this.table, newArrayListWithCapacity2);
            } finally {
            }
        } catch (Throwable th2) {
            if (createIcebergStreamWriter != null) {
                $closeResource(th, createIcebergStreamWriter);
            }
            throw th2;
        }
    }

    @Test
    public void testPromotedFlinkDataType() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "tinyint", Types.IntegerType.get()), Types.NestedField.required(2, "smallint", Types.IntegerType.get()), Types.NestedField.optional(3, "int", Types.IntegerType.get())});
        TableSchema build = TableSchema.builder().field("tinyint", DataTypes.TINYINT().notNull()).field("smallint", DataTypes.SMALLINT().notNull()).field("int", DataTypes.INT().nullable()).build();
        PartitionSpec build2 = this.partitioned ? PartitionSpec.builderFor(schema).identity("smallint").identity("tinyint").identity("int").build() : PartitionSpec.unpartitioned();
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        Table create = new HadoopTables().create(schema, build2, ImmutableMap.of("write.format.default", this.format.name()), absolutePath);
        ArrayList newArrayList = Lists.newArrayList(new RowData[]{GenericRowData.of(new Object[]{(byte) 1, Short.MIN_VALUE, 101}), GenericRowData.of(new Object[]{(byte) 2, (short) 0, 102}), GenericRowData.of(new Object[]{(byte) 3, Short.MAX_VALUE, 103})});
        GenericRecord create2 = GenericRecord.create(schema);
        ArrayList newArrayList2 = Lists.newArrayList(new Record[]{create2.copy(ImmutableMap.of("tinyint", 1, "smallint", -32768, "int", 101)), create2.copy(ImmutableMap.of("tinyint", 2, "smallint", 0, "int", 102)), create2.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103))});
        OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter = createIcebergStreamWriter(create, build);
        try {
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                createIcebergStreamWriter.processElement((RowData) it.next(), 1L);
            }
            createIcebergStreamWriter.prepareSnapshotPreBarrier(1L);
            WriteResult build3 = WriteResult.builder().addAll(createIcebergStreamWriter.extractOutputValues()).build();
            Assert.assertEquals(0L, build3.deleteFiles().length);
            Assert.assertEquals(this.partitioned ? 3L : 1L, build3.dataFiles().length);
            AppendFiles newAppend = create.newAppend();
            Stream stream = Arrays.stream(build3.dataFiles());
            Objects.requireNonNull(newAppend);
            stream.forEach(newAppend::appendFile);
            newAppend.commit();
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            SimpleDataUtil.assertTableRecords(absolutePath, newArrayList2);
        } catch (Throwable th) {
            if (createIcebergStreamWriter != null) {
                $closeResource(null, createIcebergStreamWriter);
            }
            throw th;
        }
    }

    private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter() throws Exception {
        return createIcebergStreamWriter(this.table, SimpleDataUtil.FLINK_SCHEMA);
    }

    private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(Table table, TableSchema tableSchema) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, WriteResult> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(FlinkSink.createStreamWriter(table, new FlinkWriteConf(table, Maps.newHashMap(), new org.apache.flink.configuration.Configuration()), FlinkSink.toFlinkRowType(table.schema(), tableSchema), (List) null), 1, 1, 0);
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        return oneInputStreamOperatorTestHarness;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
