package org.apache.flink.statefun.examples.ridesharing.simulator.simulation.messaging;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Objects;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundPassengerMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundPassengerMessage;
import org.apache.flink.statefun.examples.ridesharing.simulator.model.WebsocketDriverEvent;
import org.apache.flink.statefun.examples.ridesharing.simulator.model.WebsocketPassengerEvent;
import org.apache.flink.statefun.examples.ridesharing.simulator.services.KafkaDriverPublisher;
import org.apache.flink.statefun.examples.ridesharing.simulator.services.KafkaPassengerPublisher;
import org.apache.flink.statefun.examples.ridesharing.simulator.simulation.DriverMessaging;
import org.apache.flink.statefun.examples.ridesharing.simulator.simulation.PassengerMessaging;
import org.apache.flink.statefun.examples.ridesharing.simulator.simulation.engine.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/flink/statefun/examples/ridesharing/simulator/simulation/messaging/Communication.class */
public class Communication implements PassengerMessaging, DriverMessaging {
    private final KafkaPassengerPublisher passengerPublisher;
    private final KafkaDriverPublisher driverPublisher;
    private final SimpMessagingTemplate simpSender;
    private final String passengerWebSocketTopic;
    private final String driverWebSocketTopic;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Scheduler scheduler;

    @Autowired
    public Communication(KafkaPassengerPublisher kafkaPassengerPublisher, KafkaDriverPublisher kafkaDriverPublisher, SimpMessagingTemplate simpMessagingTemplate, @Value("${web-socket.topic.passenger}") String str, @Value("${web-socket.topic.driver}") String str2, Scheduler scheduler) {
        this.passengerPublisher = (KafkaPassengerPublisher) Objects.requireNonNull(kafkaPassengerPublisher);
        this.driverPublisher = (KafkaDriverPublisher) Objects.requireNonNull(kafkaDriverPublisher);
        this.simpSender = (SimpMessagingTemplate) Objects.requireNonNull(simpMessagingTemplate);
        this.passengerWebSocketTopic = (String) Objects.requireNonNull(str);
        this.driverWebSocketTopic = (String) Objects.requireNonNull(str2);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.PassengerMessaging
    public void incomingPassengerEvent(OutboundPassengerMessage outboundPassengerMessage) {
        this.scheduler.enqueueTaskMessage(outboundPassengerMessage.getPassengerId(), outboundPassengerMessage);
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.DriverMessaging
    public void incomingDriverEvent(OutboundDriverMessage outboundDriverMessage) {
        this.scheduler.enqueueTaskMessage(outboundDriverMessage.getDriverId(), outboundDriverMessage);
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.PassengerMessaging
    public void outgoingPassengerEvent(InboundPassengerMessage inboundPassengerMessage) {
        this.passengerPublisher.accept(inboundPassengerMessage);
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.DriverMessaging
    public void outgoingDriverEvent(InboundDriverMessage inboundDriverMessage) {
        this.driverPublisher.accept(inboundDriverMessage);
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.PassengerMessaging
    public void broadcastPassengerSimulationEvent(WebsocketPassengerEvent websocketPassengerEvent) {
        this.simpSender.convertAndSend((SimpMessagingTemplate) this.passengerWebSocketTopic, (Object) toJsonString(websocketPassengerEvent));
    }

    @Override // org.apache.flink.statefun.examples.ridesharing.simulator.simulation.DriverMessaging
    public void broadcastDriverSimulationEvent(WebsocketDriverEvent websocketDriverEvent) {
        if (websocketDriverEvent.getDriverStatus() == WebsocketDriverEvent.DriverStatus.IDLE) {
            return;
        }
        this.simpSender.convertAndSend((SimpMessagingTemplate) this.driverWebSocketTopic, (Object) toJsonString(websocketDriverEvent));
    }

    private String toJsonString(Object obj) {
        try {
            return this.objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}
