package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestStreamScanSql.class */
public class TestStreamScanSql extends FlinkCatalogTestBase {
    private static final String TABLE = "test_table";
    private static final FileFormat FORMAT = FileFormat.PARQUET;
    private TableEnvironment tEnv;

    public TestStreamScanSql(String str, Namespace namespace) {
        super(str, namespace);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.FlinkTestBase
    public TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            synchronized (this) {
                if (this.tEnv == null) {
                    EnvironmentSettings.Builder inStreamingMode = EnvironmentSettings.newInstance().inStreamingMode();
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                    executionEnvironment.enableCheckpointing(400L);
                    StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, inStreamingMode.build());
                    create.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
                    this.tEnv = create;
                }
            }
        }
        return this.tEnv;
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @Before
    public void before() {
        super.before();
        sql("CREATE DATABASE %s", this.flinkDatabase);
        sql("USE CATALOG %s", this.catalogName);
        sql("USE %s", "db");
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @After
    public void clean() {
        sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE);
        sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    private void insertRows(String str, Table table, Row... rowArr) throws IOException {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(table, FORMAT, TEMPORARY_FOLDER);
        GenericRecord create = GenericRecord.create(table.schema());
        ArrayList newArrayList = Lists.newArrayList();
        for (Row row : rowArr) {
            newArrayList.add(create.copy("id", row.getField(0), "data", row.getField(1), "dt", row.getField(2)));
        }
        if (str != null) {
            genericAppenderHelper.appendToTable(TestHelpers.Row.of(new Object[]{str, 0}), newArrayList);
        } else {
            genericAppenderHelper.appendToTable(newArrayList);
        }
    }

    private void insertRows(Table table, Row... rowArr) throws IOException {
        insertRows(null, table, rowArr);
    }

    private void assertRows(List<Row> list, Iterator<Row> it) {
        for (Row row : list) {
            Assert.assertTrue("Should have more records", it.hasNext());
            Row next = it.next();
            Assert.assertEquals("Should have expected fields", 3L, next.getArity());
            Assert.assertEquals("Should have expected id", row.getField(0), next.getField(0));
            Assert.assertEquals("Should have expected data", row.getField(1), next.getField(1));
            Assert.assertEquals("Should have expected dt", row.getField(2), next.getField(2));
        }
    }

    @Test
    public void testUnPartitionedTable() throws Exception {
        sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE));
        TableResult exec = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        CloseableIterator collect = exec.collect();
        Throwable th = null;
        try {
            try {
                Row of = Row.of(new Object[]{1, "aaa", "2021-01-01"});
                insertRows(loadTable, of);
                assertRows(ImmutableList.of(of), collect);
                Row of2 = Row.of(new Object[]{2, "bbb", "2021-01-01"});
                insertRows(loadTable, of2);
                assertRows(ImmutableList.of(of2), collect);
                if (collect != null) {
                    $closeResource(null, collect);
                }
                exec.getJobClient().ifPresent((v0) -> {
                    v0.cancel();
                });
            } finally {
            }
        } catch (Throwable th2) {
            if (collect != null) {
                $closeResource(th, collect);
            }
            throw th2;
        }
    }

    @Test
    public void testPartitionedTable() throws Exception {
        sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE);
        Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE));
        TableResult exec = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        CloseableIterator collect = exec.collect();
        Throwable th = null;
        try {
            try {
                Row of = Row.of(new Object[]{1, "aaa", "2021-01-01"});
                insertRows("2021-01-01", loadTable, of);
                assertRows(ImmutableList.of(of), collect);
                Row of2 = Row.of(new Object[]{2, "bbb", "2021-01-02"});
                insertRows("2021-01-02", loadTable, of2);
                assertRows(ImmutableList.of(of2), collect);
                Row of3 = Row.of(new Object[]{1, "aaa", "2021-01-02"});
                insertRows("2021-01-02", loadTable, of3);
                assertRows(ImmutableList.of(of3), collect);
                Row of4 = Row.of(new Object[]{2, "bbb", "2021-01-01"});
                insertRows("2021-01-01", loadTable, of4);
                assertRows(ImmutableList.of(of4), collect);
                if (collect != null) {
                    $closeResource(null, collect);
                }
                exec.getJobClient().ifPresent((v0) -> {
                    v0.cancel();
                });
            } finally {
            }
        } catch (Throwable th2) {
            if (collect != null) {
                $closeResource(th, collect);
            }
            throw th2;
        }
    }

    @Test
    public void testConsumeFromBeginning() throws Exception {
        sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE));
        Row of = Row.of(new Object[]{1, "aaa", "2021-01-01"});
        Row of2 = Row.of(new Object[]{2, "bbb", "2021-01-01"});
        insertRows(loadTable, of, of2);
        TableResult exec = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        CloseableIterator collect = exec.collect();
        Throwable th = null;
        try {
            try {
                assertRows(ImmutableList.of(of, of2), collect);
                Row of3 = Row.of(new Object[]{3, "ccc", "2021-01-01"});
                insertRows(loadTable, of3);
                assertRows(ImmutableList.of(of3), collect);
                Row of4 = Row.of(new Object[]{4, "ddd", "2021-01-01"});
                insertRows(loadTable, of4);
                assertRows(ImmutableList.of(of4), collect);
                if (collect != null) {
                    $closeResource(null, collect);
                }
                exec.getJobClient().ifPresent((v0) -> {
                    v0.cancel();
                });
            } finally {
            }
        } catch (Throwable th2) {
            if (collect != null) {
                $closeResource(th, collect);
            }
            throw th2;
        }
    }

    @Test
    public void testConsumeFromStartSnapshotId() throws Exception {
        sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE));
        Row of = Row.of(new Object[]{1, "aaa", "2021-01-01"});
        Row of2 = Row.of(new Object[]{2, "bbb", "2021-01-01"});
        insertRows(loadTable, of);
        insertRows(loadTable, of2);
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        Row of3 = Row.of(new Object[]{3, "ccc", "2021-01-01"});
        Row of4 = Row.of(new Object[]{4, "ddd", "2021-01-01"});
        insertRows(loadTable, of3, of4);
        TableResult exec = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='%d')*/", TABLE, Long.valueOf(snapshotId));
        CloseableIterator collect = exec.collect();
        Throwable th = null;
        try {
            try {
                assertRows(ImmutableList.of(of3, of4), collect);
                Row of5 = Row.of(new Object[]{5, "eee", "2021-01-01"});
                Row of6 = Row.of(new Object[]{6, "fff", "2021-01-01"});
                insertRows(loadTable, of5, of6);
                assertRows(ImmutableList.of(of5, of6), collect);
                Row of7 = Row.of(new Object[]{7, "ggg", "2021-01-01"});
                insertRows(loadTable, of7);
                assertRows(ImmutableList.of(of7), collect);
                if (collect != null) {
                    $closeResource(null, collect);
                }
                exec.getJobClient().ifPresent((v0) -> {
                    v0.cancel();
                });
            } finally {
            }
        } catch (Throwable th2) {
            if (collect != null) {
                $closeResource(th, collect);
            }
            throw th2;
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
