package org.apache.flink.statefun.flink.io.kafka.binders.ingress.v1;

import com.google.protobuf.Message;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.io.common.json.IngressIdentifierJsonDeserializer;
import org.apache.flink.statefun.flink.io.common.json.PropertiesJsonDeserializer;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressAutoResetPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilderApiExtension;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;

/* JADX INFO: Access modifiers changed from: package-private */
@JsonDeserialize(builder = Builder.class)
/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec.class */
public final class RoutableKafkaIngressSpec {
    private final IngressIdentifier<Message> id;
    private final Optional<String> kafkaAddress;
    private final Optional<String> consumerGroupId;
    private final Map<String, RoutingConfig> topicRoutings;
    private final KafkaIngressAutoResetPosition autoOffsetResetPosition;
    private final KafkaIngressStartupPosition startupPosition;
    private final Properties properties;

    /* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec$AutoOffsetResetPositionJsonDeserializer.class */
    private static class AutoOffsetResetPositionJsonDeserializer extends JsonDeserializer<KafkaIngressAutoResetPosition> {
        private AutoOffsetResetPositionJsonDeserializer() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer
        public KafkaIngressAutoResetPosition deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
            try {
                return KafkaIngressAutoResetPosition.valueOf(jsonParser.getText().toUpperCase(Locale.ENGLISH));
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Invalid autoOffsetResetPosition: " + jsonParser.getText() + "; valid values are " + Arrays.toString(KafkaIngressAutoResetPosition.values()), e);
            }
        }
    }

    @JsonPOJOBuilder
    /* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec$Builder.class */
    public static class Builder {
        private final IngressIdentifier<Message> id;
        private Optional<String> kafkaAddress = Optional.empty();
        private Optional<String> consumerGroupId = Optional.empty();
        private Map<String, RoutingConfig> topicRoutings = new HashMap();
        private KafkaIngressAutoResetPosition autoOffsetResetPosition = KafkaIngressAutoResetPosition.LATEST;
        private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
        private Properties properties = new Properties();

        @JsonCreator
        private Builder(@JsonDeserialize(using = IngressIdentifierJsonDeserializer.class) @JsonProperty("id") IngressIdentifier<Message> ingressIdentifier) {
            this.id = (IngressIdentifier) Objects.requireNonNull(ingressIdentifier);
        }

        @JsonProperty("address")
        public Builder withKafkaAddress(String str) {
            Objects.requireNonNull(str);
            this.kafkaAddress = Optional.of(str);
            return this;
        }

        @JsonProperty("consumerGroupId")
        public Builder withConsumerGroupId(String str) {
            Objects.requireNonNull(str);
            this.consumerGroupId = Optional.of(str);
            return this;
        }

        @JsonDeserialize(using = TopicRoutingsJsonDeserializer.class)
        @JsonProperty(ConsumerProtocol.TOPICS_KEY_NAME)
        public Builder withTopicRoutings(Map<String, RoutingConfig> map) {
            this.topicRoutings = (Map) Objects.requireNonNull(map);
            return this;
        }

        @JsonDeserialize(using = AutoOffsetResetPositionJsonDeserializer.class)
        @JsonProperty("autoOffsetResetPosition")
        public Builder withAutoOffsetResetPosition(KafkaIngressAutoResetPosition kafkaIngressAutoResetPosition) {
            this.autoOffsetResetPosition = (KafkaIngressAutoResetPosition) Objects.requireNonNull(kafkaIngressAutoResetPosition);
            return this;
        }

        @JsonDeserialize(using = StartupPositionJsonDeserializer.class)
        @JsonProperty("startupPosition")
        public Builder withStartupPosition(KafkaIngressStartupPosition kafkaIngressStartupPosition) {
            this.startupPosition = (KafkaIngressStartupPosition) Objects.requireNonNull(kafkaIngressStartupPosition);
            return this;
        }

        @JsonDeserialize(using = PropertiesJsonDeserializer.class)
        @JsonProperty("properties")
        public Builder withProperties(Properties properties) {
            this.properties = (Properties) Objects.requireNonNull(properties);
            return this;
        }

        public RoutableKafkaIngressSpec build() {
            return new RoutableKafkaIngressSpec(this.id, this.kafkaAddress, this.consumerGroupId, this.topicRoutings, this.autoOffsetResetPosition, this.startupPosition, this.properties);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec$StartupPositionJsonDeserializer.class */
    public static class StartupPositionJsonDeserializer extends JsonDeserializer<KafkaIngressStartupPosition> {
        private static final String STARTUP_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
        private static final DateTimeFormatter STARTUP_DATE_FORMATTER = DateTimeFormatter.ofPattern(STARTUP_DATE_PATTERN);

        private StartupPositionJsonDeserializer() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer
        public KafkaIngressStartupPosition deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
            ObjectNode objectNode = (ObjectNode) jsonParser.readValueAs(ObjectNode.class);
            String asText = objectNode.get("type").asText();
            boolean z = -1;
            switch (asText.hashCode()) {
                case -1109880953:
                    if (asText.equals("latest")) {
                        z = 2;
                        break;
                    }
                    break;
                case -809579181:
                    if (asText.equals("earliest")) {
                        z = true;
                        break;
                    }
                    break;
                case -410146651:
                    if (asText.equals("specific-offsets")) {
                        z = 3;
                        break;
                    }
                    break;
                case 3076014:
                    if (asText.equals("date")) {
                        z = 4;
                        break;
                    }
                    break;
                case 1556617458:
                    if (asText.equals("group-offsets")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return KafkaIngressStartupPosition.fromGroupOffsets();
                case true:
                    return KafkaIngressStartupPosition.fromEarliest();
                case true:
                    return KafkaIngressStartupPosition.fromLatest();
                case true:
                    return KafkaIngressStartupPosition.fromSpecificOffsets(RoutableKafkaIngressSpec.parseSpecificStartupOffsetsMap(objectNode));
                case true:
                    return KafkaIngressStartupPosition.fromDate(RoutableKafkaIngressSpec.parseStartupDate(objectNode));
                default:
                    throw new IllegalArgumentException("Invalid startup position type: " + asText + "; valid values are [group-offsets, earliest, latest, specific-offsets, date]");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec$TopicRoutingsJsonDeserializer.class */
    private static class TopicRoutingsJsonDeserializer extends JsonDeserializer<Map<String, RoutingConfig>> {
        private TopicRoutingsJsonDeserializer() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer
        public Map<String, RoutingConfig> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
            ObjectNode[] objectNodeArr = (ObjectNode[]) jsonParser.readValueAs(ObjectNode[].class);
            HashMap hashMap = new HashMap(objectNodeArr.length);
            for (ObjectNode objectNode : objectNodeArr) {
                hashMap.put(objectNode.get("topic").asText(), RoutingConfig.newBuilder().setTypeUrl(objectNode.get("valueType").textValue()).addAllTargetFunctionTypes(RoutableKafkaIngressSpec.parseTargetFunctions(objectNode)).build());
            }
            return hashMap;
        }
    }

    private RoutableKafkaIngressSpec(IngressIdentifier<Message> ingressIdentifier, Optional<String> optional, Optional<String> optional2, Map<String, RoutingConfig> map, KafkaIngressAutoResetPosition kafkaIngressAutoResetPosition, KafkaIngressStartupPosition kafkaIngressStartupPosition, Properties properties) {
        this.id = ingressIdentifier;
        this.kafkaAddress = optional;
        this.consumerGroupId = optional2;
        this.topicRoutings = map;
        this.autoOffsetResetPosition = kafkaIngressAutoResetPosition;
        this.startupPosition = kafkaIngressStartupPosition;
        this.properties = properties;
    }

    public IngressIdentifier<Message> id() {
        return this.id;
    }

    public KafkaIngressSpec toUniversalKafkaIngressSpec() {
        KafkaIngressBuilder forIdentifier = KafkaIngressBuilder.forIdentifier(this.id);
        Optional<String> optional = this.kafkaAddress;
        forIdentifier.getClass();
        optional.ifPresent(forIdentifier::withKafkaAddress);
        Optional<String> optional2 = this.consumerGroupId;
        forIdentifier.getClass();
        optional2.ifPresent(forIdentifier::withConsumerGroupId);
        Set<String> keySet = this.topicRoutings.keySet();
        forIdentifier.getClass();
        keySet.forEach(forIdentifier::withTopic);
        forIdentifier.withAutoResetPosition(this.autoOffsetResetPosition);
        forIdentifier.withStartupPosition(this.startupPosition);
        forIdentifier.withProperties(this.properties);
        KafkaIngressBuilderApiExtension.withDeserializer(forIdentifier, new RoutableKafkaIngressDeserializer(this.topicRoutings));
        return forIdentifier.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<TargetFunctionType> parseTargetFunctions(JsonNode jsonNode) {
        return (List) StreamSupport.stream(jsonNode.get("targets").spliterator(), false).map(RoutableKafkaIngressSpec::parseTargetFunctionType).collect(Collectors.toList());
    }

    private static TargetFunctionType parseTargetFunctionType(JsonNode jsonNode) {
        TypeName parseFrom = TypeName.parseFrom(jsonNode.asText());
        return TargetFunctionType.newBuilder().setNamespace(parseFrom.namespace()).setType(parseFrom.name()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<KafkaTopicPartition, Long> parseSpecificStartupOffsetsMap(ObjectNode objectNode) {
        JsonNode jsonNode = objectNode.get("offsets");
        HashMap hashMap = new HashMap();
        jsonNode.forEach(jsonNode2 -> {
            Map.Entry<String, JsonNode> next = jsonNode2.fields().next();
            hashMap.put(KafkaTopicPartition.fromString(next.getKey()), Long.valueOf(next.getValue().asLong()));
        });
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ZonedDateTime parseStartupDate(ObjectNode objectNode) {
        String asText = objectNode.get("date").asText();
        try {
            return ZonedDateTime.parse(asText, StartupPositionJsonDeserializer.STARTUP_DATE_FORMATTER);
        } catch (DateTimeParseException e) {
            throw new IllegalArgumentException("Unable to parse date string for startup position: " + asText + "; the date should conform to the pattern yyyy-MM-dd HH:mm:ss.SSS Z", e);
        }
    }
}
