package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/iceberg/flink/sink/IcebergSink.class */
public class IcebergSink implements Sink<RowData>, SupportsPreWriteTopology<RowData>, SupportsCommitter<IcebergCommittable>, SupportsPreCommitTopology<WriteResult, IcebergCommittable>, SupportsPostCommitTopology<IcebergCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class);
    private final TableLoader tableLoader;
    private final Map<String, String> snapshotProperties;
    private final String uidSuffix;
    private final Map<String, String> writeProperties;
    private final RowType flinkRowType;
    private final SerializableSupplier<Table> tableSupplier;
    private final transient FlinkWriteConf flinkWriteConf;
    private final List<Integer> equalityFieldIds;
    private final boolean upsertMode;
    private final FileFormat dataFileFormat;
    private final long targetDataFileSize;
    private final String branch;
    private final boolean overwriteMode;
    private final int workerPoolSize;
    private final Table table;
    private final List<String> equalityFieldColumns = null;
    private final String sinkId = UUID.randomUUID().toString();

    /* loaded from: input_file:org/apache/iceberg/flink/sink/IcebergSink$Builder.class */
    public static class Builder {
        private TableLoader tableLoader;
        private TableSchema tableSchema;
        private SerializableTable table;
        private String uidSuffix = "";
        private Function<String, DataStream<RowData>> inputCreator = null;
        private final Map<String, String> writeOptions = Maps.newHashMap();
        private final Map<String, String> snapshotSummary = Maps.newHashMap();
        private ReadableConfig readableConfig = new Configuration();
        private List<String> equalityFieldColumns = null;

        private Builder() {
        }

        private Builder forRowData(DataStream<RowData> dataStream) {
            this.inputCreator = str -> {
                return dataStream;
            };
            return this;
        }

        private Builder forRow(DataStream<Row> dataStream, TableSchema tableSchema) {
            RowType logicalType = tableSchema.toRowDataType().getLogicalType();
            DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes());
            Objects.requireNonNull(rowConverter);
            return forMapperOutputType(dataStream, (v1) -> {
                return r2.toInternal(v1);
            }, FlinkCompatibilityUtil.toTypeInfo(logicalType)).tableSchema(tableSchema);
        }

        private <T> Builder forMapperOutputType(DataStream<T> dataStream, MapFunction<T, RowData> mapFunction, TypeInformation<RowData> typeInformation) {
            this.inputCreator = str -> {
                SingleOutputStreamOperator parallelism = dataStream.map(mapFunction, typeInformation).setParallelism(dataStream.getParallelism());
                if (str != null) {
                    String format = String.format("Sink pre-writer mapper: %s", str);
                    parallelism.name(format).uid(format);
                }
                return parallelism;
            };
            return this;
        }

        public Builder table(Table table) {
            this.table = (SerializableTable) SerializableTable.copyOf(table);
            return this;
        }

        public Builder tableLoader(TableLoader tableLoader) {
            this.tableLoader = tableLoader;
            return this;
        }

        TableLoader tableLoader() {
            return this.tableLoader;
        }

        public Builder set(String str, String str2) {
            this.writeOptions.put(str, str2);
            return this;
        }

        public Builder setAll(Map<String, String> map) {
            this.writeOptions.putAll(map);
            return this;
        }

        public Builder tableSchema(TableSchema tableSchema) {
            this.tableSchema = tableSchema;
            return this;
        }

        public Builder overwrite(boolean z) {
            this.writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(z));
            return this;
        }

        public Builder flinkConf(ReadableConfig readableConfig) {
            this.readableConfig = readableConfig;
            return this;
        }

        public Builder distributionMode(DistributionMode distributionMode) {
            Preconditions.checkArgument(!DistributionMode.RANGE.equals(distributionMode), "Flink does not support 'range' write distribution mode now.");
            if (distributionMode != null) {
                this.writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), distributionMode.modeName());
            }
            return this;
        }

        public Builder writeParallelism(int i) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(i));
            return this;
        }

        public Builder upsert(boolean z) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(z));
            return this;
        }

        public Builder equalityFieldColumns(List<String> list) {
            this.equalityFieldColumns = list;
            return this;
        }

        public Builder uidSuffix(String str) {
            this.uidSuffix = str;
            return this;
        }

        public Builder snapshotProperties(Map<String, String> map) {
            this.snapshotSummary.putAll(map);
            return this;
        }

        public Builder setSnapshotProperty(String str, String str2) {
            this.snapshotSummary.put(str, str2);
            return this;
        }

        public Builder toBranch(String str) {
            this.writeOptions.put(FlinkWriteOptions.BRANCH.key(), str);
            return this;
        }

        IcebergSink build() {
            Preconditions.checkArgument(this.inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
            Preconditions.checkNotNull(tableLoader(), "Table loader shouldn't be null");
            SerializableTable checkAndGetTable = IcebergSink.checkAndGetTable(tableLoader(), this.table);
            this.table = checkAndGetTable;
            FlinkWriteConf flinkWriteConf = new FlinkWriteConf(this.table, this.writeOptions, this.readableConfig);
            Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval();
            SerializableSupplier cachingTableSupplier = tableRefreshInterval != null ? new CachingTableSupplier(this.table, tableLoader(), tableRefreshInterval) : () -> {
                return checkAndGetTable;
            };
            boolean overwriteMode = flinkWriteConf.overwriteMode();
            List<Integer> checkAndGetEqualityFieldIds = SinkUtil.checkAndGetEqualityFieldIds(this.table, this.equalityFieldColumns);
            if (flinkWriteConf.upsertMode()) {
                Preconditions.checkState(!overwriteMode, "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
                Preconditions.checkState(!checkAndGetEqualityFieldIds.isEmpty(), "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
                if (!this.table.spec().isUnpartitioned()) {
                    for (PartitionField partitionField : this.table.spec().fields()) {
                        Preconditions.checkState(checkAndGetEqualityFieldIds.contains(Integer.valueOf(partitionField.sourceId())), "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", new Object[]{partitionField, this.equalityFieldColumns});
                    }
                }
            }
            return new IcebergSink(this.tableLoader, this.table, this.snapshotSummary, this.uidSuffix, IcebergSink.writeProperties(this.table, flinkWriteConf.dataFileFormat(), flinkWriteConf), IcebergSink.toFlinkRowType(this.table.schema(), this.tableSchema), cachingTableSupplier, flinkWriteConf, checkAndGetEqualityFieldIds, flinkWriteConf.branch(), overwriteMode);
        }

        public DataStreamSink<RowData> append() {
            IcebergSink build = build();
            String defaultSuffix = IcebergSink.defaultSuffix(this.uidSuffix, this.table.name());
            DataStreamSink<RowData> name = this.inputCreator.apply(defaultSuffix).sinkTo(build).uid(defaultSuffix).name(defaultSuffix);
            if (build.flinkWriteConf.writeParallelism() != null) {
                name.setParallelism(build.flinkWriteConf.writeParallelism().intValue());
            }
            return name;
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1589109311:
                    if (implMethodName.equals("lambda$build$bc8887de$1")) {
                        z = true;
                        break;
                    }
                    break;
                case 550342232:
                    if (implMethodName.equals("toInternal")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/data/util/DataFormatConverters$DataFormatConverter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                        DataFormatConverters.RowConverter rowConverter = (DataFormatConverters.RowConverter) serializedLambda.getCapturedArg(0);
                        return (v1) -> {
                            return r0.toInternal(v1);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/iceberg/util/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/IcebergSink$Builder") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/iceberg/SerializableTable;)Lorg/apache/iceberg/Table;")) {
                        SerializableTable serializableTable = (SerializableTable) serializedLambda.getCapturedArg(0);
                        return () -> {
                            return serializableTable;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    private IcebergSink(TableLoader tableLoader, Table table, Map<String, String> map, String str, Map<String, String> map2, RowType rowType, SerializableSupplier<Table> serializableSupplier, FlinkWriteConf flinkWriteConf, List<Integer> list, String str2, boolean z) {
        this.tableLoader = tableLoader;
        this.snapshotProperties = map;
        this.uidSuffix = str;
        this.writeProperties = map2;
        this.flinkRowType = rowType;
        this.tableSupplier = serializableSupplier;
        this.flinkWriteConf = flinkWriteConf;
        this.equalityFieldIds = list;
        this.branch = str2;
        this.overwriteMode = z;
        this.table = table;
        this.upsertMode = flinkWriteConf.upsertMode();
        this.dataFileFormat = flinkWriteConf.dataFileFormat();
        this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
        this.workerPoolSize = flinkWriteConf.workerPoolSize();
    }

    public SinkWriter<RowData> createWriter(Sink.InitContext initContext) {
        return new IcebergSinkWriter(this.tableSupplier.get().name(), new RowDataTaskWriterFactory(this.tableSupplier, this.flinkRowType, this.targetDataFileSize, this.dataFileFormat, this.writeProperties, this.equalityFieldIds, this.upsertMode), new IcebergStreamWriterMetrics(initContext.metricGroup(), this.table.name()), initContext.getSubtaskId(), initContext.getAttemptNumber());
    }

    public Committer<IcebergCommittable> createCommitter(CommitterInitContext committerInitContext) {
        return new IcebergCommitter(this.tableLoader, this.branch, this.snapshotProperties, this.overwriteMode, this.workerPoolSize, this.sinkId, new IcebergFilesCommitterMetrics(committerInitContext.metricGroup(), this.table.name()));
    }

    public SimpleVersionedSerializer<IcebergCommittable> getCommittableSerializer() {
        return new IcebergCommittableSerializer();
    }

    public void addPostCommitTopology(DataStream<CommittableMessage<IcebergCommittable>> dataStream) {
    }

    public DataStream<RowData> addPreWriteTopology(DataStream<RowData> dataStream) {
        return distributeDataStream(dataStream);
    }

    public DataStream<CommittableMessage<IcebergCommittable>> addPreCommitTopology(DataStream<CommittableMessage<WriteResult>> dataStream) {
        TypeInformation of = CommittableMessageTypeInfo.of(this::getCommittableSerializer);
        String format = String.format("Sink pre-commit aggregator: %s", defaultSuffix(this.uidSuffix, this.table.name()));
        return dataStream.global().transform(format, of, new IcebergWriteAggregator(this.tableLoader)).uid(format).setParallelism(1).setMaxParallelism(1).global();
    }

    public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
        return new WriteResultSerializer();
    }

    private static String defaultSuffix(String str, String str2) {
        return (str == null || str.isEmpty()) ? str2 : str;
    }

    private static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) {
        if (table != null) {
            return (SerializableTable) SerializableTable.copyOf(table);
        }
        if (!tableLoader.isOpen()) {
            tableLoader.open();
        }
        try {
            try {
                SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
                if (tableLoader != null) {
                    tableLoader.close();
                }
                return serializableTable;
            } finally {
            }
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
        }
    }

    private static RowType toFlinkRowType(Schema schema, TableSchema tableSchema) {
        if (tableSchema == null) {
            return FlinkSchemaUtil.convert(schema);
        }
        TypeUtil.validateWriteSchema(schema, TypeUtil.reassignIds(FlinkSchemaUtil.convert(tableSchema), schema), true, true);
        return tableSchema.toRowDataType().getLogicalType();
    }

    private static Map<String, String> writeProperties(Table table, FileFormat fileFormat, FlinkWriteConf flinkWriteConf) {
        HashMap newHashMap = Maps.newHashMap(table.properties());
        switch (fileFormat) {
            case PARQUET:
                newHashMap.put(TableProperties.PARQUET_COMPRESSION, flinkWriteConf.parquetCompressionCodec());
                String parquetCompressionLevel = flinkWriteConf.parquetCompressionLevel();
                if (parquetCompressionLevel != null) {
                    newHashMap.put(TableProperties.PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
                    break;
                }
                break;
            case AVRO:
                newHashMap.put(TableProperties.AVRO_COMPRESSION, flinkWriteConf.avroCompressionCodec());
                if (flinkWriteConf.avroCompressionLevel() != null) {
                    newHashMap.put(TableProperties.AVRO_COMPRESSION_LEVEL, flinkWriteConf.avroCompressionLevel());
                    break;
                }
                break;
            case ORC:
                newHashMap.put(TableProperties.ORC_COMPRESSION, flinkWriteConf.orcCompressionCodec());
                newHashMap.put(TableProperties.ORC_COMPRESSION_STRATEGY, flinkWriteConf.orcCompressionStrategy());
                break;
            default:
                throw new IllegalArgumentException(String.format("Unknown file format %s", fileFormat));
        }
        return newHashMap;
    }

    private DataStream<RowData> distributeDataStream(DataStream<RowData> dataStream) {
        DistributionMode distributionMode = this.flinkWriteConf.distributionMode();
        Schema schema = this.table.schema();
        PartitionSpec spec = this.table.spec();
        LOG.info("Write distribution mode is '{}'", distributionMode.modeName());
        switch (distributionMode) {
            case NONE:
                if (this.equalityFieldIds.isEmpty()) {
                    return dataStream;
                }
                LOG.info("Distribute rows by equality fields, because there are equality fields set");
                return dataStream.keyBy(new EqualityFieldKeySelector(schema, this.flinkRowType, this.equalityFieldIds));
            case HASH:
                if (this.equalityFieldIds.isEmpty()) {
                    if (!this.table.spec().isUnpartitioned()) {
                        return BucketPartitionerUtil.hasOneBucketField(spec) ? dataStream.partitionCustom(new BucketPartitioner(spec), new BucketPartitionKeySelector(spec, schema, this.flinkRowType)) : dataStream.keyBy(new PartitionKeySelector(spec, schema, this.flinkRowType));
                    }
                    LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and table is unpartitioned");
                    return dataStream;
                }
                if (spec.isUnpartitioned()) {
                    LOG.info("Distribute rows by equality fields, because there are equality fields set and table is unpartitioned");
                    return dataStream.keyBy(new EqualityFieldKeySelector(schema, this.flinkRowType, this.equalityFieldIds));
                }
                for (PartitionField partitionField : spec.fields()) {
                    Preconditions.checkState(this.equalityFieldIds.contains(Integer.valueOf(partitionField.sourceId())), "In 'hash' distribution mode with equality fields set, partition field '%s' should be included in equality fields: '%s'", new Object[]{partitionField, this.equalityFieldColumns});
                }
                return dataStream.keyBy(new PartitionKeySelector(spec, schema, this.flinkRowType));
            case RANGE:
                if (this.equalityFieldIds.isEmpty()) {
                    LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and {}=range is not supported yet in flink", TableProperties.WRITE_DISTRIBUTION_MODE);
                    return dataStream;
                }
                LOG.info("Distribute rows by equality fields, because there are equality fields set and{}=range is not supported yet in flink", TableProperties.WRITE_DISTRIBUTION_MODE);
                return dataStream.keyBy(new EqualityFieldKeySelector(schema, this.flinkRowType, this.equalityFieldIds));
            default:
                throw new RuntimeException("Unrecognized write.distribution-mode: " + distributionMode);
        }
    }

    public static <T> Builder builderFor(DataStream<T> dataStream, MapFunction<T, RowData> mapFunction, TypeInformation<RowData> typeInformation) {
        return new Builder().forMapperOutputType(dataStream, mapFunction, typeInformation);
    }

    public static Builder forRow(DataStream<Row> dataStream, TableSchema tableSchema) {
        return new Builder().forRow(dataStream, tableSchema);
    }

    public static Builder forRowData(DataStream<RowData> dataStream) {
        return new Builder().forRowData(dataStream);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -731654093:
                if (implMethodName.equals("getCommittableSerializer")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/sink/IcebergSink") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    IcebergSink icebergSink = (IcebergSink) serializedLambda.getCapturedArg(0);
                    return icebergSink::getCommittableSerializer;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
