/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.examples.ridesharing;

import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.statefun.examples.ridesharing.Identifiers;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundPassengerMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundPassengerMessage;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

final class KafkaSpecs {
    private static final String KAFKA_SERVER = "kafka-broker:9092";
    private static final String TO_PASSENGER_KAFKA_TOPIC_NAME = "to-passenger";
    private static final String TO_DRIVER_TOPIC_NAME = "to-driver";
    private static final String FROM_DRIVER_TOPIC_NAME = "from-driver";
    private static final String FROM_PASSENGER_TOPIC_NAME = "from-passenger";
    static IngressSpec<InboundDriverMessage> FROM_DRIVER_SPEC = KafkaIngressBuilder.forIdentifier(Identifiers.FROM_DRIVER).withKafkaAddress("kafka-broker:9092").withTopic("from-driver").withProperty("group.id", "statefun-from-driver-group").withDeserializer(FromDriverDeserializer.class).build();
    static IngressSpec<InboundPassengerMessage> FROM_PASSENGER_SPEC = KafkaIngressBuilder.forIdentifier(Identifiers.FROM_PASSENGERS).withKafkaAddress("kafka-broker:9092").withTopic("from-passenger").withProperty("group.id", "statefun-from-passenger-group").withDeserializer(FromPassengersDeserializer.class).build();
    static EgressSpec<OutboundPassengerMessage> TO_PASSENGER_SPEC = KafkaEgressBuilder.forIdentifier(Identifiers.TO_PASSENGER_EGRESS).withKafkaAddress("kafka-broker:9092").withSerializer(ToPassengersSerializer.class).build();
    static EgressSpec<OutboundDriverMessage> TO_DRIVER_SPEC = KafkaEgressBuilder.forIdentifier(Identifiers.TO_OUTBOUND_DRIVER).withKafkaAddress("kafka-broker:9092").withSerializer(ToDriverSerializer.class).build();

    KafkaSpecs() {
    }

    private static final class ToDriverSerializer
    implements KafkaEgressSerializer<OutboundDriverMessage> {
        private static final long serialVersionUID = 1L;

        private ToDriverSerializer() {
        }

        public ProducerRecord<byte[], byte[]> serialize(OutboundDriverMessage message) {
            byte[] key = message.getDriverId().getBytes(StandardCharsets.UTF_8);
            byte[] value = message.toByteArray();
            return new ProducerRecord(KafkaSpecs.TO_DRIVER_TOPIC_NAME, (Object)key, (Object)value);
        }
    }

    private static final class ToPassengersSerializer
    implements KafkaEgressSerializer<OutboundPassengerMessage> {
        private static final long serialVersionUID = 1L;

        private ToPassengersSerializer() {
        }

        public ProducerRecord<byte[], byte[]> serialize(OutboundPassengerMessage message) {
            byte[] key = message.getPassengerId().getBytes(StandardCharsets.UTF_8);
            byte[] value = message.toByteArray();
            return new ProducerRecord(KafkaSpecs.TO_PASSENGER_KAFKA_TOPIC_NAME, (Object)key, (Object)value);
        }
    }

    private static final class FromPassengersDeserializer
    implements KafkaIngressDeserializer<InboundPassengerMessage> {
        private static final long serialVersionUID = 1L;

        private FromPassengersDeserializer() {
        }

        public InboundPassengerMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
            try {
                return InboundPassengerMessage.parseFrom((byte[])((byte[])input.value()));
            }
            catch (InvalidProtocolBufferException ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private static final class FromDriverDeserializer
    implements KafkaIngressDeserializer<InboundDriverMessage> {
        private static final long serialVersionUID = 1L;

        private FromDriverDeserializer() {
        }

        public InboundDriverMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
            try {
                return InboundDriverMessage.parseFrom((byte[])((byte[])input.value()));
            }
            catch (InvalidProtocolBufferException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

