/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.examples.ridesharing;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.statefun.examples.ridesharing.FnRide;
import org.apache.flink.statefun.examples.ridesharing.Identifiers;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverJoinsRide;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundPassengerMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundPassengerMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.PassengerJoinsRide;
import org.apache.flink.statefun.examples.ridesharing.generated.RideEnded;
import org.apache.flink.statefun.examples.ridesharing.generated.RideFailed;
import org.apache.flink.statefun.examples.ridesharing.generated.RideStarted;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.match.MatchBinder;
import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;

public class FnPassenger
extends StatefulMatchFunction {
    static final FunctionType TYPE = new FunctionType("org.apache.flink.statefun.examples.ridesharing", "passenger");

    public void configure(MatchBinder binder) {
        binder.predicate(InboundPassengerMessage.class, InboundPassengerMessage::hasRequestRide, this::whenRideIsRequested).predicate(DriverJoinsRide.class, this::whenDriverJoins).predicate(RideFailed.class, this::whenRideFails).predicate(RideStarted.class, this::whenRideHasStarted).predicate(RideEnded.class, this::whenRideHasEnded);
    }

    private void whenRideIsRequested(Context context, InboundPassengerMessage request) {
        String passengerID = context.self().id();
        String rideId = "ride-" + ThreadLocalRandom.current().nextLong();
        InboundPassengerMessage.RequestRide rideRequest = request.getRequestRide();
        PassengerJoinsRide joinRide = PassengerJoinsRide.newBuilder().setPassengerId(passengerID).setStartGeoCell(rideRequest.getStartGeoCell()).setEndGeoCell(rideRequest.getEndGeoCell()).build();
        context.send(FnRide.TYPE, rideId, (Object)joinRide);
    }

    private void whenRideHasEnded(Context context, RideEnded ignored) {
        OutboundPassengerMessage out = OutboundPassengerMessage.newBuilder().setPassengerId(context.self().id()).setRideEnded(OutboundPassengerMessage.RideEnded.newBuilder().build()).build();
        context.send(Identifiers.TO_PASSENGER_EGRESS, (Object)out);
    }

    private void whenRideHasStarted(Context context, RideStarted rideStarted) {
        OutboundPassengerMessage out = OutboundPassengerMessage.newBuilder().setPassengerId(context.self().id()).setRideStarted(OutboundPassengerMessage.RideStarted.newBuilder().setDriverId(rideStarted.getDriverId()).build()).build();
        context.send(Identifiers.TO_PASSENGER_EGRESS, (Object)out);
    }

    private void whenDriverJoins(Context context, DriverJoinsRide message) {
        OutboundPassengerMessage out = OutboundPassengerMessage.newBuilder().setPassengerId(context.self().id()).setDriverFound(OutboundPassengerMessage.DriverHasBeenFound.newBuilder().setDriverId(message.getDriverId()).setDriverGeoCell(message.getDriverLocation()).build()).build();
        context.send(Identifiers.TO_PASSENGER_EGRESS, (Object)out);
    }

    private void whenRideFails(Context context, RideFailed rideFailed) {
        OutboundPassengerMessage out = OutboundPassengerMessage.newBuilder().setPassengerId(context.self().id()).setRideFailed(OutboundPassengerMessage.RideFailed.newBuilder().setRideId(rideFailed.getRideId()).build()).build();
        context.send(Identifiers.TO_PASSENGER_EGRESS, (Object)out);
    }
}

