package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
import org.assertj.core.api.Assertions;
import org.junit.Assert;

/* loaded from: input_file:org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.class */
public class TestFlinkIcebergSinkV2Base {
    protected static final int FORMAT_V2 = 2;
    protected static final int ROW_ID_POS = 0;
    protected static final int ROW_DATA_POS = 1;
    protected int parallelism = ROW_DATA_POS;
    protected TableLoader tableLoader;
    protected Table table;
    protected StreamExecutionEnvironment env;
    protected FileFormat format;
    protected boolean partitioned;
    protected String writeDistributionMode;
    protected static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
    protected static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of("+I", RowKind.INSERT, "-D", RowKind.DELETE, "-U", RowKind.UPDATE_BEFORE, "+U", RowKind.UPDATE_AFTER);

    /* JADX INFO: Access modifiers changed from: protected */
    public Row row(String str, int i, String str2) {
        RowKind rowKind = ROW_KIND_MAP.get(str);
        if (rowKind == null) {
            throw new IllegalArgumentException("Unknown row kind: " + str);
        }
        return Row.ofKind(rowKind, new Object[]{Integer.valueOf(i), str2});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testUpsertOnIdDataKey(String str) throws Exception {
        testChangeLogs(ImmutableList.of("id", "data"), row -> {
            return Row.of(new Object[]{row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)});
        }, true, ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("+U", ROW_DATA_POS, "aaa"), row("+I", FORMAT_V2, "bbb")), ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("-D", FORMAT_V2, "bbb"), row("+I", FORMAT_V2, "ccc")), ImmutableList.of(row("+U", ROW_DATA_POS, "bbb"), row("-U", ROW_DATA_POS, "ccc"), row("-D", ROW_DATA_POS, "aaa"))), ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(FORMAT_V2, "bbb")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(FORMAT_V2, "ccc")), ImmutableList.of(record(ROW_DATA_POS, "bbb"), record(FORMAT_V2, "ccc"))), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChangeLogOnIdDataKey(String str) throws Exception {
        testChangeLogs(ImmutableList.of("data", "id"), row -> {
            return Row.of(new Object[]{row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)});
        }, false, ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("-D", ROW_DATA_POS, "aaa"), row("+I", FORMAT_V2, "bbb"), row("+I", ROW_DATA_POS, "bbb"), row("+I", FORMAT_V2, "aaa")), ImmutableList.of(row("-U", FORMAT_V2, "aaa"), row("+U", ROW_DATA_POS, "ccc"), row("+I", ROW_DATA_POS, "aaa")), ImmutableList.of(row("-D", ROW_DATA_POS, "bbb"), row("+I", FORMAT_V2, "aaa"))), ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "bbb"), record(FORMAT_V2, "aaa"), record(FORMAT_V2, "bbb")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(ROW_DATA_POS, "bbb"), record(ROW_DATA_POS, "ccc"), record(FORMAT_V2, "bbb")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(ROW_DATA_POS, "ccc"), record(FORMAT_V2, "aaa"), record(FORMAT_V2, "bbb"))), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChangeLogOnSameKey(String str) throws Exception {
        testChangeLogs(ImmutableList.of("id", "data"), row -> {
            return Row.of(new Object[]{row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)});
        }, false, ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("-D", ROW_DATA_POS, "aaa"), row("+I", ROW_DATA_POS, "aaa")), ImmutableList.of(row("-U", ROW_DATA_POS, "aaa"), row("+U", ROW_DATA_POS, "aaa")), ImmutableList.of(row("-D", ROW_DATA_POS, "aaa"), row("+I", ROW_DATA_POS, "aaa")), ImmutableList.of(row("-U", ROW_DATA_POS, "aaa"), row("+U", ROW_DATA_POS, "aaa"), row("+I", ROW_DATA_POS, "aaa"))), ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "aaa")), ImmutableList.of(record(ROW_DATA_POS, "aaa")), ImmutableList.of(record(ROW_DATA_POS, "aaa")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(ROW_DATA_POS, "aaa"))), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChangeLogOnDataKey(String str) throws Exception {
        testChangeLogs(ImmutableList.of("data"), row -> {
            return row.getField(ROW_DATA_POS);
        }, false, ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("-D", ROW_DATA_POS, "aaa"), row("+I", FORMAT_V2, "bbb"), row("+I", ROW_DATA_POS, "bbb"), row("+I", FORMAT_V2, "aaa")), ImmutableList.of(row("-U", FORMAT_V2, "aaa"), row("+U", ROW_DATA_POS, "ccc"), row("+I", ROW_DATA_POS, "aaa")), ImmutableList.of(row("-D", ROW_DATA_POS, "bbb"), row("+I", FORMAT_V2, "aaa"), row("+I", FORMAT_V2, "ccc"))), ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "bbb"), record(FORMAT_V2, "aaa")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(ROW_DATA_POS, "bbb"), record(ROW_DATA_POS, "ccc")), ImmutableList.of(record(ROW_DATA_POS, "aaa"), record(ROW_DATA_POS, "ccc"), record(FORMAT_V2, "aaa"), record(FORMAT_V2, "ccc"))), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testUpsertOnDataKey(String str) throws Exception {
        testChangeLogs(ImmutableList.of("data"), row -> {
            return row.getField(ROW_DATA_POS);
        }, true, ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("+I", FORMAT_V2, "aaa"), row("+I", 3, "bbb")), ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")), ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb"))), ImmutableList.of(ImmutableList.of(record(FORMAT_V2, "aaa"), record(3, "bbb")), ImmutableList.of(record(4, "aaa"), record(5, "bbb")), ImmutableList.of(record(6, "aaa"), record(7, "bbb"))), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChangeLogOnIdKey(String str) throws Exception {
        ImmutableList of = ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("-D", ROW_DATA_POS, "aaa"), row("+I", ROW_DATA_POS, "bbb"), row("+I", FORMAT_V2, "aaa"), row("-D", FORMAT_V2, "aaa"), row("+I", FORMAT_V2, "bbb")), ImmutableList.of(row("-U", FORMAT_V2, "bbb"), row("+U", FORMAT_V2, "ccc"), row("-D", FORMAT_V2, "ccc"), row("+I", FORMAT_V2, "ddd")), ImmutableList.of(row("-D", ROW_DATA_POS, "bbb"), row("+I", ROW_DATA_POS, "ccc"), row("-D", ROW_DATA_POS, "ccc"), row("+I", ROW_DATA_POS, "ddd")));
        ImmutableList of2 = ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "bbb"), record(FORMAT_V2, "bbb")), ImmutableList.of(record(ROW_DATA_POS, "bbb"), record(FORMAT_V2, "ddd")), ImmutableList.of(record(ROW_DATA_POS, "ddd"), record(FORMAT_V2, "ddd")));
        if (this.partitioned && this.writeDistributionMode.equals("hash")) {
            Assertions.assertThatThrownBy(() -> {
                testChangeLogs(ImmutableList.of("id"), row -> {
                    return row.getField(ROW_ID_POS);
                }, false, of, of2, str);
            }).isInstanceOf(IllegalStateException.class).hasMessageStartingWith("In 'hash' distribution mode with equality fields set, partition field").hasMessageContaining("should be included in equality fields:");
        } else {
            testChangeLogs(ImmutableList.of("id"), row -> {
                return row.getField(ROW_ID_POS);
            }, false, of, of2, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testUpsertOnIdKey(String str) throws Exception {
        ImmutableList of = ImmutableList.of(ImmutableList.of(row("+I", ROW_DATA_POS, "aaa"), row("+U", ROW_DATA_POS, "bbb")), ImmutableList.of(row("+I", ROW_DATA_POS, "ccc")), ImmutableList.of(row("+U", ROW_DATA_POS, "ddd"), row("+I", ROW_DATA_POS, "eee")));
        ImmutableList of2 = ImmutableList.of(ImmutableList.of(record(ROW_DATA_POS, "bbb")), ImmutableList.of(record(ROW_DATA_POS, "ccc")), ImmutableList.of(record(ROW_DATA_POS, "eee")));
        if (this.partitioned) {
            Assertions.assertThatThrownBy(() -> {
                testChangeLogs(ImmutableList.of("id"), row -> {
                    return row.getField(ROW_ID_POS);
                }, true, of, of2, str);
            }).isInstanceOf(IllegalStateException.class).hasMessageContaining("should be included in equality fields");
        } else {
            testChangeLogs(ImmutableList.of("id"), row -> {
                return row.getField(ROW_ID_POS);
            }, true, of, of2, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChangeLogs(List<String> list, KeySelector<Row, Object> keySelector, boolean z, List<List<Row>> list2, List<List<Record>> list3, String str) throws Exception {
        FlinkSink.forRow(this.env.addSource(new BoundedTestSource(list2), ROW_TYPE_INFO), SimpleDataUtil.FLINK_SCHEMA).tableLoader(this.tableLoader).tableSchema(SimpleDataUtil.FLINK_SCHEMA).writeParallelism(this.parallelism).equalityFieldColumns(list).upsert(z).toBranch(str).append();
        this.env.execute("Test Iceberg Change-Log DataStream.");
        this.table.refresh();
        List<Snapshot> findValidSnapshots = findValidSnapshots();
        int size = list3.size();
        Assert.assertEquals("Should have the expected snapshot number", size, findValidSnapshots.size());
        for (int i = ROW_ID_POS; i < size; i += ROW_DATA_POS) {
            Assert.assertEquals("Should have the expected records for the checkpoint#" + i, expectedRowSet((Record[]) list3.get(i).toArray(new Record[ROW_ID_POS])), actualRowSet(findValidSnapshots.get(i).snapshotId(), "*"));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Record record(int i, String str) {
        return SimpleDataUtil.createRecord(Integer.valueOf(i), str);
    }

    private List<Snapshot> findValidSnapshots() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Snapshot snapshot : this.table.snapshots()) {
            if (snapshot.allManifests(this.table.io()).stream().anyMatch(manifestFile -> {
                return snapshot.snapshotId() == manifestFile.snapshotId().longValue();
            })) {
                newArrayList.add(snapshot);
            }
        }
        return newArrayList;
    }

    private StructLikeSet expectedRowSet(Record... recordArr) {
        return SimpleDataUtil.expectedRowSet(this.table, recordArr);
    }

    private StructLikeSet actualRowSet(long j, String... strArr) throws IOException {
        this.table.refresh();
        StructLikeSet create = StructLikeSet.create(this.table.schema().asStruct());
        CloseableIterable build = IcebergGenerics.read(this.table).useSnapshot(j).select(strArr).build();
        Throwable th = ROW_ID_POS;
        try {
            try {
                Objects.requireNonNull(create);
                build.forEach((v1) -> {
                    r1.add(v1);
                });
                if (build != null) {
                    if (th != null) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                return create;
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2084615907:
                if (implMethodName.equals("lambda$testUpsertOnIdKey$909ff544$1")) {
                    z = FORMAT_V2;
                    break;
                }
                break;
            case -2084615906:
                if (implMethodName.equals("lambda$testUpsertOnIdKey$909ff544$2")) {
                    z = ROW_DATA_POS;
                    break;
                }
                break;
            case -2009451090:
                if (implMethodName.equals("lambda$testChangeLogOnIdDataKey$909ff544$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1265592473:
                if (implMethodName.equals("lambda$testUpsertOnIdDataKey$909ff544$1")) {
                    z = 5;
                    break;
                }
                break;
            case 300996079:
                if (implMethodName.equals("lambda$testChangeLogOnSameKey$909ff544$1")) {
                    z = 8;
                    break;
                }
                break;
            case 654910180:
                if (implMethodName.equals("lambda$testChangeLogOnIdKey$909ff544$1")) {
                    z = 7;
                    break;
                }
                break;
            case 654910181:
                if (implMethodName.equals("lambda$testChangeLogOnIdKey$909ff544$2")) {
                    z = 6;
                    break;
                }
                break;
            case 1358167603:
                if (implMethodName.equals("lambda$testChangeLogOnDataKey$909ff544$1")) {
                    z = ROW_ID_POS;
                    break;
                }
                break;
            case 1488550444:
                if (implMethodName.equals("lambda$testUpsertOnDataKey$909ff544$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case ROW_ID_POS /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row -> {
                        return row.getField(ROW_DATA_POS);
                    };
                }
                break;
            case ROW_DATA_POS /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row2 -> {
                        return row2.getField(ROW_ID_POS);
                    };
                }
                break;
            case FORMAT_V2 /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row3 -> {
                        return row3.getField(ROW_ID_POS);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row4 -> {
                        return Row.of(new Object[]{row4.getField(ROW_ID_POS), row4.getField(ROW_DATA_POS)});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row5 -> {
                        return row5.getField(ROW_DATA_POS);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row6 -> {
                        return Row.of(new Object[]{row6.getField(ROW_ID_POS), row6.getField(ROW_DATA_POS)});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row7 -> {
                        return row7.getField(ROW_ID_POS);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row8 -> {
                        return row8.getField(ROW_ID_POS);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row9 -> {
                        return Row.of(new Object[]{row9.getField(ROW_ID_POS), row9.getField(ROW_DATA_POS)});
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
