package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.lineage.DataLineageEntity;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.predicate.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/FlinkLineageITCase.class */
public class FlinkLineageITCase extends CatalogITCaseBase {
    private static final String THROWING_META = "throwing-meta";
    private static final Map<String, Map<String, TableLineageEntity>> jobSourceTableLineages = new HashMap();
    private static final Map<String, Map<String, TableLineageEntity>> jobSinkTableLineages = new HashMap();

    /* loaded from: input_file:org/apache/paimon/flink/FlinkLineageITCase$TestingMemoryLineageMeta.class */
    private static class TestingMemoryLineageMeta implements LineageMeta {
        private TestingMemoryLineageMeta() {
        }

        public void saveSourceTableLineage(TableLineageEntity tableLineageEntity) {
            ((Map) FlinkLineageITCase.jobSourceTableLineages.computeIfAbsent(tableLineageEntity.getJob(), str -> {
                return new HashMap();
            })).put(FlinkLineageITCase.getTableLineageKey(tableLineageEntity), tableLineageEntity);
        }

        public void deleteSourceTableLineage(String str) {
            FlinkLineageITCase.jobSourceTableLineages.remove(str);
        }

        public Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate) {
            return FlinkLineageITCase.jobSourceTableLineages.values().stream().flatMap(map -> {
                return map.values().stream();
            }).iterator();
        }

        public void saveSinkTableLineage(TableLineageEntity tableLineageEntity) {
            Assertions.assertThat(tableLineageEntity.getJob()).isEqualTo("insert_t_job");
            Assertions.assertThat(tableLineageEntity.getTable()).isEqualTo("T");
            Assertions.assertThat(tableLineageEntity.getDatabase()).isEqualTo(FlinkTestBase.CURRENT_DATABASE);
            ((Map) FlinkLineageITCase.jobSinkTableLineages.computeIfAbsent(tableLineageEntity.getJob(), str -> {
                return new HashMap();
            })).put(FlinkLineageITCase.getTableLineageKey(tableLineageEntity), tableLineageEntity);
        }

        public Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate) {
            return FlinkLineageITCase.jobSinkTableLineages.values().stream().flatMap(map -> {
                return map.values().stream();
            }).iterator();
        }

        public void deleteSinkTableLineage(String str) {
            FlinkLineageITCase.jobSinkTableLineages.remove(str);
        }

        public void saveSourceDataLineage(DataLineageEntity dataLineageEntity) {
            Assertions.assertThat(dataLineageEntity.getJob()).isEqualTo("select_t_job");
            Assertions.assertThat(dataLineageEntity.getTable()).isEqualTo("T");
            Assertions.assertThat(dataLineageEntity.getDatabase()).isEqualTo(FlinkTestBase.CURRENT_DATABASE);
            throw new UnsupportedOperationException("Method saveSinkTableLineage is not supported");
        }

        public Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate) {
            throw new UnsupportedOperationException();
        }

        public void saveSinkDataLineage(DataLineageEntity dataLineageEntity) {
            throw new UnsupportedOperationException();
        }

        public Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate) {
            throw new UnsupportedOperationException();
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/FlinkLineageITCase$TestingMemoryLineageMetaFactory.class */
    public static class TestingMemoryLineageMetaFactory implements LineageMetaFactory {
        private static final long serialVersionUID = 1;

        public String identifier() {
            return FlinkLineageITCase.THROWING_META;
        }

        public LineageMeta create(LineageMetaFactory.LineageMetaContext lineageMetaContext) {
            return new TestingMemoryLineageMeta();
        }
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected Map<String, String> catalogOptions() {
        return Collections.singletonMap(CatalogOptions.LINEAGE_META.key(), THROWING_META);
    }

    @Test
    public void testTableLineage() throws Exception {
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
        }).hasCauseExactlyInstanceOf(ValidationException.class).hasRootCauseMessage("Cannot get pipeline name for lineage meta.");
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("SELECT * FROM T").collect().close();
        }).hasCauseExactlyInstanceOf(ValidationException.class).hasRootCauseMessage("Cannot get pipeline name for lineage meta.");
        this.tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "insert_t_job");
        this.tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
        Assertions.assertThat(jobSinkTableLineages).isNotEmpty();
        Assertions.assertThat(jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job").getTable()).isEqualTo("T");
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = this.tEnv.executeSql("SELECT * FROM sys.sink_table_lineage").collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    arrayList.add(collect.next());
                } finally {
                }
            } finally {
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } else {
                collect.close();
            }
        }
        Assertions.assertThat(arrayList.size()).isEqualTo(1);
        Row row = (Row) arrayList.get(0);
        Assertions.assertThat(row.getField("database_name")).isEqualTo(FlinkTestBase.CURRENT_DATABASE);
        Assertions.assertThat(row.getField("table_name")).isEqualTo("T");
        Assertions.assertThat(row.getField("job_name")).isEqualTo("insert_t_job");
        this.tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "select_t_job");
        this.tEnv.executeSql("SELECT * FROM T").collect().close();
        Assertions.assertThat(jobSourceTableLineages).isNotEmpty();
        Assertions.assertThat(jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job").getTable()).isEqualTo("T");
        ArrayList arrayList2 = new ArrayList();
        collect = this.tEnv.executeSql("SELECT * FROM sys.source_table_lineage").collect();
        Throwable th3 = null;
        while (collect.hasNext()) {
            try {
                try {
                    arrayList2.add(collect.next());
                } finally {
                }
            } finally {
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        Assertions.assertThat(arrayList2.size()).isEqualTo(1);
        Row row2 = (Row) arrayList2.get(0);
        Assertions.assertThat(row2.getField("database_name")).isEqualTo(FlinkTestBase.CURRENT_DATABASE);
        Assertions.assertThat(row2.getField("table_name")).isEqualTo("T");
        Assertions.assertThat(row2.getField("job_name")).isEqualTo("select_t_job");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getTableLineageKey(TableLineageEntity tableLineageEntity) {
        return String.format("%s.%s.%s", tableLineageEntity.getDatabase(), tableLineageEntity.getTable(), tableLineageEntity.getJob());
    }
}
