package org.apache.inlong.sort.protocol.node.extract;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.KafkaConstant;
import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;

@JsonTypeName("kafkaExtract")
@JsonInclude(JsonInclude.Include.NON_NULL)
/* loaded from: input_file:org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.class */
public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metadata, Serializable {
    private static final long serialVersionUID = 1;

    @Nonnull
    @JsonProperty("topic")
    private String topic;

    @Nonnull
    @JsonProperty("bootstrapServers")
    private String bootstrapServers;

    @Nonnull
    @JsonProperty(TubeMQConstant.FORMAT)
    private Format format;

    @JsonProperty("scanStartupMode")
    private KafkaScanStartupMode kafkaScanStartupMode;

    @JsonProperty("primaryKey")
    private String primaryKey;

    @JsonProperty("groupId")
    private String groupId;

    @JsonProperty("scanSpecificOffsets")
    private String scanSpecificOffsets;

    @JsonProperty("scanTimestampMillis")
    private String scanTimestampMillis;

    public KafkaExtractNode(@JsonProperty("id") String str, @JsonProperty("name") String str2, @JsonProperty("fields") List<FieldInfo> list, @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField, @JsonProperty("properties") Map<String, String> map, @Nonnull @JsonProperty("topic") String str3, @Nonnull @JsonProperty("bootstrapServers") String str4, @Nonnull @JsonProperty("format") Format format, @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode, @JsonProperty("primaryKey") String str5, @JsonProperty("groupId") String str6) {
        this(str, str2, list, watermarkField, map, str3, str4, format, kafkaScanStartupMode, str5, str6, null, null);
    }

    @JsonCreator
    public KafkaExtractNode(@JsonProperty("id") String str, @JsonProperty("name") String str2, @JsonProperty("fields") List<FieldInfo> list, @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField, @JsonProperty("properties") Map<String, String> map, @Nonnull @JsonProperty("topic") String str3, @Nonnull @JsonProperty("bootstrapServers") String str4, @Nonnull @JsonProperty("format") Format format, @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode, @JsonProperty("primaryKey") String str5, @JsonProperty("groupId") String str6, @JsonProperty("scanSpecificOffsets") String str7, @JsonProperty("scanTimestampMillis") String str8) {
        super(str, str2, list, watermarkField, map);
        this.topic = (String) Preconditions.checkNotNull(str3, "kafka topic is empty");
        this.bootstrapServers = (String) Preconditions.checkNotNull(str4, "kafka bootstrapServers is empty");
        this.format = (Format) Preconditions.checkNotNull(format, "kafka format is empty");
        this.kafkaScanStartupMode = (KafkaScanStartupMode) Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
        this.primaryKey = str5;
        this.groupId = str6;
        if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSETS) {
            Preconditions.checkArgument(StringUtils.isNotEmpty(str7), "scanSpecificOffsets is empty");
            this.scanSpecificOffsets = str7;
        }
        if (KafkaScanStartupMode.TIMESTAMP_MILLIS == kafkaScanStartupMode) {
            Preconditions.checkArgument(StringUtils.isNotBlank(str8), "scanTimestampMillis is empty");
            this.scanTimestampMillis = str8;
        }
        if (KafkaScanStartupMode.GROUP_OFFSETS == kafkaScanStartupMode) {
            Preconditions.checkArgument(StringUtils.isNotBlank(str6), "group is empty when enable group offsets");
        }
    }

    @Override // org.apache.inlong.sort.protocol.node.Node
    public Map<String, String> tableOptions() {
        Map<String, String> tableOptions = super.tableOptions();
        tableOptions.put("topic", this.topic);
        tableOptions.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, this.bootstrapServers);
        tableOptions.put("scan.startup.mode", this.kafkaScanStartupMode.getValue());
        if (isUpsertKafkaConnector(this.format, !StringUtils.isEmpty(this.primaryKey))) {
            tableOptions.put("connector", KafkaConstant.UPSERT_KAFKA);
            tableOptions.putAll(this.format.generateOptions(true));
        } else {
            tableOptions.put("connector", KafkaConstant.KAFKA);
            tableOptions.putAll(this.format.generateOptions(false));
        }
        if (StringUtils.isNotEmpty(this.scanSpecificOffsets)) {
            tableOptions.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, this.scanSpecificOffsets);
        }
        if (StringUtils.isNotBlank(this.scanTimestampMillis)) {
            tableOptions.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, this.scanTimestampMillis);
        }
        if (StringUtils.isNotEmpty(this.groupId)) {
            tableOptions.put(KafkaConstant.PROPERTIES_GROUP_ID, this.groupId);
        }
        return tableOptions;
    }

    private boolean isUpsertKafkaConnector(Format format, boolean z) {
        if ((format instanceof JsonFormat) && z) {
            return true;
        }
        if ((format instanceof CsvFormat) && z) {
            return true;
        }
        return (format instanceof AvroFormat) && z;
    }

    @Override // org.apache.inlong.sort.protocol.node.Node
    public String genTableName() {
        return String.format("table_%s", super.getId());
    }

    @Override // org.apache.inlong.sort.protocol.node.Node
    public String getPrimaryKey() {
        return this.primaryKey;
    }

    @Override // org.apache.inlong.sort.protocol.node.Node
    public List<FieldInfo> getPartitionFields() {
        return super.getPartitionFields();
    }

    @Override // org.apache.inlong.sort.protocol.Metadata
    public String getMetadataKey(MetaField metaField) {
        String str;
        switch (metaField) {
            case TABLE_NAME:
                str = "value.table";
                break;
            case DATABASE_NAME:
                str = "value.database";
                break;
            case SQL_TYPE:
                str = "value.sql-type";
                break;
            case PK_NAMES:
                str = "value.pk-names";
                break;
            case TS:
                str = "value.ingestion-timestamp";
                break;
            case OP_TS:
                str = "value.event-timestamp";
                break;
            case OP_TYPE:
                str = "value.type";
                break;
            case IS_DDL:
                str = "value.is-ddl";
                break;
            case MYSQL_TYPE:
                str = "value.mysql-type";
                break;
            case BATCH_ID:
                str = "value.batch-id";
                break;
            case UPDATE_BEFORE:
                str = "value.update-before";
                break;
            case KEY:
                str = "key";
                break;
            case VALUE:
                str = ParquetSchemaConverter.MAP_VALUE;
                break;
            case HEADERS:
                str = "headers";
                break;
            case HEADERS_TO_JSON_STR:
                str = "headers_to_json_str";
                break;
            case PARTITION:
                str = "partition";
                break;
            case OFFSET:
                str = "offset";
                break;
            case TIMESTAMP:
                str = "timestamp";
                break;
            default:
                throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", getClass().getSimpleName(), metaField));
        }
        return str;
    }

    @Override // org.apache.inlong.sort.protocol.Metadata
    public boolean isVirtual(MetaField metaField) {
        switch (metaField) {
            case KEY:
            case VALUE:
            case HEADERS:
            case HEADERS_TO_JSON_STR:
            case PARTITION:
            case OFFSET:
            case TIMESTAMP:
                return true;
            default:
                return false;
        }
    }

    @Override // org.apache.inlong.sort.protocol.Metadata
    public Set<MetaField> supportedMetaFields() {
        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE, MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.KEY, MetaField.VALUE, MetaField.PARTITION, MetaField.HEADERS, MetaField.HEADERS_TO_JSON_STR, MetaField.OFFSET, MetaField.TIMESTAMP);
    }

    @Override // org.apache.inlong.sort.protocol.node.ExtractNode
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof KafkaExtractNode)) {
            return false;
        }
        KafkaExtractNode kafkaExtractNode = (KafkaExtractNode) obj;
        if (!kafkaExtractNode.canEqual(this) || !super.equals(obj)) {
            return false;
        }
        String topic = getTopic();
        String topic2 = kafkaExtractNode.getTopic();
        if (topic == null) {
            if (topic2 != null) {
                return false;
            }
        } else if (!topic.equals(topic2)) {
            return false;
        }
        String bootstrapServers = getBootstrapServers();
        String bootstrapServers2 = kafkaExtractNode.getBootstrapServers();
        if (bootstrapServers == null) {
            if (bootstrapServers2 != null) {
                return false;
            }
        } else if (!bootstrapServers.equals(bootstrapServers2)) {
            return false;
        }
        Format format = getFormat();
        Format format2 = kafkaExtractNode.getFormat();
        if (format == null) {
            if (format2 != null) {
                return false;
            }
        } else if (!format.equals(format2)) {
            return false;
        }
        KafkaScanStartupMode kafkaScanStartupMode = getKafkaScanStartupMode();
        KafkaScanStartupMode kafkaScanStartupMode2 = kafkaExtractNode.getKafkaScanStartupMode();
        if (kafkaScanStartupMode == null) {
            if (kafkaScanStartupMode2 != null) {
                return false;
            }
        } else if (!kafkaScanStartupMode.equals(kafkaScanStartupMode2)) {
            return false;
        }
        String primaryKey = getPrimaryKey();
        String primaryKey2 = kafkaExtractNode.getPrimaryKey();
        if (primaryKey == null) {
            if (primaryKey2 != null) {
                return false;
            }
        } else if (!primaryKey.equals(primaryKey2)) {
            return false;
        }
        String groupId = getGroupId();
        String groupId2 = kafkaExtractNode.getGroupId();
        if (groupId == null) {
            if (groupId2 != null) {
                return false;
            }
        } else if (!groupId.equals(groupId2)) {
            return false;
        }
        String scanSpecificOffsets = getScanSpecificOffsets();
        String scanSpecificOffsets2 = kafkaExtractNode.getScanSpecificOffsets();
        if (scanSpecificOffsets == null) {
            if (scanSpecificOffsets2 != null) {
                return false;
            }
        } else if (!scanSpecificOffsets.equals(scanSpecificOffsets2)) {
            return false;
        }
        String scanTimestampMillis = getScanTimestampMillis();
        String scanTimestampMillis2 = kafkaExtractNode.getScanTimestampMillis();
        return scanTimestampMillis == null ? scanTimestampMillis2 == null : scanTimestampMillis.equals(scanTimestampMillis2);
    }

    @Override // org.apache.inlong.sort.protocol.node.ExtractNode
    protected boolean canEqual(Object obj) {
        return obj instanceof KafkaExtractNode;
    }

    @Override // org.apache.inlong.sort.protocol.node.ExtractNode
    public int hashCode() {
        int hashCode = super.hashCode();
        String topic = getTopic();
        int hashCode2 = (hashCode * 59) + (topic == null ? 43 : topic.hashCode());
        String bootstrapServers = getBootstrapServers();
        int hashCode3 = (hashCode2 * 59) + (bootstrapServers == null ? 43 : bootstrapServers.hashCode());
        Format format = getFormat();
        int hashCode4 = (hashCode3 * 59) + (format == null ? 43 : format.hashCode());
        KafkaScanStartupMode kafkaScanStartupMode = getKafkaScanStartupMode();
        int hashCode5 = (hashCode4 * 59) + (kafkaScanStartupMode == null ? 43 : kafkaScanStartupMode.hashCode());
        String primaryKey = getPrimaryKey();
        int hashCode6 = (hashCode5 * 59) + (primaryKey == null ? 43 : primaryKey.hashCode());
        String groupId = getGroupId();
        int hashCode7 = (hashCode6 * 59) + (groupId == null ? 43 : groupId.hashCode());
        String scanSpecificOffsets = getScanSpecificOffsets();
        int hashCode8 = (hashCode7 * 59) + (scanSpecificOffsets == null ? 43 : scanSpecificOffsets.hashCode());
        String scanTimestampMillis = getScanTimestampMillis();
        return (hashCode8 * 59) + (scanTimestampMillis == null ? 43 : scanTimestampMillis.hashCode());
    }

    @Nonnull
    public String getTopic() {
        return this.topic;
    }

    @Nonnull
    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    @Nonnull
    public Format getFormat() {
        return this.format;
    }

    public KafkaScanStartupMode getKafkaScanStartupMode() {
        return this.kafkaScanStartupMode;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public String getScanSpecificOffsets() {
        return this.scanSpecificOffsets;
    }

    public String getScanTimestampMillis() {
        return this.scanTimestampMillis;
    }

    public void setTopic(@Nonnull String str) {
        if (str == null) {
            throw new NullPointerException("topic is marked non-null but is null");
        }
        this.topic = str;
    }

    public void setBootstrapServers(@Nonnull String str) {
        if (str == null) {
            throw new NullPointerException("bootstrapServers is marked non-null but is null");
        }
        this.bootstrapServers = str;
    }

    public void setFormat(@Nonnull Format format) {
        if (format == null) {
            throw new NullPointerException("format is marked non-null but is null");
        }
        this.format = format;
    }

    public void setKafkaScanStartupMode(KafkaScanStartupMode kafkaScanStartupMode) {
        this.kafkaScanStartupMode = kafkaScanStartupMode;
    }

    public void setPrimaryKey(String str) {
        this.primaryKey = str;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public void setScanSpecificOffsets(String str) {
        this.scanSpecificOffsets = str;
    }

    public void setScanTimestampMillis(String str) {
        this.scanTimestampMillis = str;
    }

    @Override // org.apache.inlong.sort.protocol.node.ExtractNode
    public String toString() {
        return "KafkaExtractNode(topic=" + getTopic() + ", bootstrapServers=" + getBootstrapServers() + ", format=" + getFormat() + ", kafkaScanStartupMode=" + getKafkaScanStartupMode() + ", primaryKey=" + getPrimaryKey() + ", groupId=" + getGroupId() + ", scanSpecificOffsets=" + getScanSpecificOffsets() + ", scanTimestampMillis=" + getScanTimestampMillis() + ")";
    }
}
