package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.class */
public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends ProcessFunction<I, O> {
    protected Catalog catalog;
    protected final Catalog.Loader catalogLoader;
    private static final Logger LOG = LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
    private static final List<DataTypeRoot> STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES = Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES = Arrays.asList(DataTypeRoot.TINYINT, DataTypeRoot.SMALLINT, DataTypeRoot.INTEGER, DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES = Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
    private static final List<DataTypeRoot> DECIMAL_TYPES = Arrays.asList(DataTypeRoot.DECIMAL);
    private static final List<DataTypeRoot> TIMESTAMP_TYPES = Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);

    /* loaded from: input_file:org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase$ConvertAction.class */
    public enum ConvertAction {
        CONVERT,
        IGNORE,
        EXCEPTION
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public UpdatedDataFieldsProcessFunctionBase(Catalog.Loader loader) {
        this.catalogLoader = loader;
    }

    public void open(Configuration configuration) {
        this.catalog = this.catalogLoader.load();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void applySchemaChange(SchemaManager schemaManager, SchemaChange schemaChange, Identifier identifier) throws Exception {
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                this.catalog.alterTable(identifier, schemaChange, false);
                return;
            } catch (Catalog.ColumnAlreadyExistException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to perform SchemaChange.AddColumn {}, possibly due to duplicated column name", schemaChange, e);
                    return;
                }
                return;
            }
        }
        if (!(schemaChange instanceof SchemaChange.UpdateColumnType)) {
            if (!(schemaChange instanceof SchemaChange.UpdateColumnComment)) {
                throw new UnsupportedOperationException("Unsupported schema change class " + schemaChange.getClass().getName() + ", content " + schemaChange);
            }
            this.catalog.alterTable(identifier, schemaChange, false);
            return;
        }
        SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType) schemaChange;
        TableSchema tableSchema = (TableSchema) schemaManager.latest().orElseThrow(() -> {
            return new RuntimeException("Table does not exist. This is unexpected.");
        });
        int indexOf = tableSchema.fieldNames().indexOf(updateColumnType.fieldName());
        Preconditions.checkState(indexOf >= 0, "Field name " + updateColumnType.fieldName() + " does not exist in table. This is unexpected.");
        DataType type = ((DataField) tableSchema.fields().get(indexOf)).type();
        DataType newDataType = updateColumnType.newDataType();
        switch (canConvert(type, newDataType)) {
            case CONVERT:
                this.catalog.alterTable(identifier, schemaChange, false);
                return;
            case EXCEPTION:
                throw new UnsupportedOperationException(String.format("Cannot convert field %s from type %s to %s of Paimon table %s.", updateColumnType.fieldName(), type, newDataType, identifier.getFullName()));
            default:
                return;
        }
    }

    public static ConvertAction canConvert(DataType dataType, DataType dataType2) {
        if (dataType.equalsIgnoreNullable(dataType2)) {
            return ConvertAction.CONVERT;
        }
        int indexOf = STRING_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf2 = STRING_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf >= 0 && indexOf2 >= 0) {
            return DataTypeChecks.getLength(dataType) <= DataTypeChecks.getLength(dataType2) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf3 = BINARY_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf4 = BINARY_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf3 >= 0 && indexOf4 >= 0) {
            return DataTypeChecks.getLength(dataType) <= DataTypeChecks.getLength(dataType2) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf5 = INTEGER_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf6 = INTEGER_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf5 >= 0 && indexOf6 >= 0) {
            return indexOf5 <= indexOf6 ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf7 = FLOATING_POINT_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf8 = FLOATING_POINT_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf7 >= 0 && indexOf8 >= 0) {
            return indexOf7 <= indexOf8 ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf9 = DECIMAL_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf10 = DECIMAL_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf9 < 0 || indexOf10 < 0) {
            return (TIMESTAMP_TYPES.indexOf(dataType.getTypeRoot()) < 0 || TIMESTAMP_TYPES.indexOf(dataType2.getTypeRoot()) < 0) ? ConvertAction.EXCEPTION : DataTypeChecks.getPrecision(dataType).intValue() <= DataTypeChecks.getPrecision(dataType2).intValue() ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        return (DataTypeChecks.getPrecision(dataType2).intValue() > DataTypeChecks.getPrecision(dataType).intValue() || DataTypeChecks.getScale(dataType2) > DataTypeChecks.getScale(dataType)) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SchemaChange> extractSchemaChanges(SchemaManager schemaManager, List<DataField> list) {
        RowType logicalRowType = ((TableSchema) schemaManager.latest().get()).logicalRowType();
        HashMap hashMap = new HashMap();
        for (DataField dataField : logicalRowType.getFields()) {
            hashMap.put(dataField.name(), dataField);
        }
        ArrayList arrayList = new ArrayList();
        for (DataField dataField2 : list) {
            if (hashMap.containsKey(dataField2.name())) {
                DataField dataField3 = (DataField) hashMap.get(dataField2.name());
                if (!dataField3.type().equalsIgnoreNullable(dataField2.type())) {
                    arrayList.add(SchemaChange.updateColumnType(dataField2.name(), dataField2.type()));
                    if (dataField2.description() != null) {
                        arrayList.add(SchemaChange.updateColumnComment(new String[]{dataField2.name()}, dataField2.description()));
                    }
                } else if (dataField2.description() != null && !dataField2.description().equals(dataField3.description())) {
                    arrayList.add(SchemaChange.updateColumnComment(new String[]{dataField2.name()}, dataField2.description()));
                }
            } else {
                arrayList.add(SchemaChange.addColumn(dataField2.name(), dataField2.type(), dataField2.description(), (SchemaChange.Move) null));
            }
        }
        return arrayList;
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }
}
