package net.maritimecloud.internal.mms.client.broadcast;

import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import net.maritimecloud.core.id.MaritimeId;
import net.maritimecloud.internal.message.MessageHelper;
import net.maritimecloud.internal.message.text.json.JsonMessageReader;
import net.maritimecloud.internal.mms.client.ClientInfo;
import net.maritimecloud.internal.mms.client.MmsThreadManager;
import net.maritimecloud.internal.mms.client.broadcast.SubscriptionSet;
import net.maritimecloud.internal.mms.client.connection.ClientConnection;
import net.maritimecloud.internal.net.messages.Broadcast;
import net.maritimecloud.internal.net.messages.BroadcastAck;
import net.maritimecloud.internal.net.messages.MessageHasher;
import net.maritimecloud.internal.net.util.DefaultAcknowledgement;
import net.maritimecloud.internal.net.util.DefaultMessageHeader;
import net.maritimecloud.internal.util.Coverage;
import net.maritimecloud.internal.util.MessageStore;
import net.maritimecloud.internal.util.logging.Logger;
import net.maritimecloud.message.Message;
import net.maritimecloud.message.MessageSerializer;
import net.maritimecloud.net.BroadcastConsumer;
import net.maritimecloud.net.BroadcastMessage;
import net.maritimecloud.net.BroadcastSubscription;
import net.maritimecloud.net.DispatchedMessage;
import net.maritimecloud.net.MessageHeader;
import net.maritimecloud.net.mms.MmsBroadcastOptions;
import net.maritimecloud.net.mms.MmsClientClosedException;
import net.maritimecloud.util.Binary;
import net.maritimecloud.util.geometry.Area;
import net.maritimecloud.util.geometry.Circle;
import net.maritimecloud.util.geometry.PositionTime;
import org.cakeframework.container.concurrent.ScheduleWithFixedDelay;
import org.cakeframework.container.concurrent.ThreadManager;
import org.cakeframework.container.lifecycle.RunOnStop;

/* loaded from: input_file:net/maritimecloud/internal/mms/client/broadcast/ClientBroadcastManager.class */
public class ClientBroadcastManager {
    static final Logger LOG = Logger.get(ClientBroadcastManager.class);
    private final ClientConnection connection;
    volatile boolean isShutdown;
    private final MmsThreadManager threadManager;
    private final ClientInfo info;
    final ScheduledExecutorService ses;
    private final MessageStore<DispatchedBroadcast> dispatchedBroadcasts = new MessageStore<>();
    final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
    final ReentrantReadWriteLock subscribeLock = new ReentrantReadWriteLock();
    final ConcurrentHashMap<String, SubscriptionSet> subscribers = new ConcurrentHashMap<>();

    public ClientBroadcastManager(ClientInfo clientInfo, MmsThreadManager mmsThreadManager, ClientConnection clientConnection, ThreadManager threadManager) {
        this.connection = (ClientConnection) Objects.requireNonNull(clientConnection);
        this.threadManager = (MmsThreadManager) Objects.requireNonNull(mmsThreadManager);
        this.info = (ClientInfo) Objects.requireNonNull(clientInfo);
        this.ses = threadManager.getScheduledExecutor("");
        clientConnection.subscribe(BroadcastAck.class, (mmsMessage, broadcastAck) -> {
            onBroadcastAck(broadcastAck);
        });
        clientConnection.subscribe(Broadcast.class, (mmsMessage2, broadcast) -> {
            onBroadcastMessage(broadcast);
        });
    }

    public <T extends BroadcastMessage> BroadcastSubscription broadcastSubscribe(Class<T> cls, BroadcastConsumer<T> broadcastConsumer, Area area) {
        return broadcastSubscribe(BroadcastDeserializer.CLASSPATH_DESERIALIZER, MessageHelper.getName(cls), broadcastConsumer, area);
    }

    public <T extends BroadcastMessage> BroadcastSubscription broadcastSubscribe(BroadcastDeserializer broadcastDeserializer, String str, BroadcastConsumer<T> broadcastConsumer, Area area) {
        this.subscribeLock.readLock().lock();
        try {
            if (this.isShutdown) {
                throw new MmsClientClosedException("The mms client has been shutdown");
            }
            BroadcastSubscription newSubscription = this.subscribers.computeIfAbsent(str, str2 -> {
                return new SubscriptionSet(this, str);
            }).newSubscription(broadcastDeserializer, broadcastConsumer, area == null ? Coverage.ALL : new Coverage.StaticAreaCoverage(area));
            this.subscribeLock.readLock().unlock();
            return newSubscription;
        } catch (Throwable th) {
            this.subscribeLock.readLock().unlock();
            throw th;
        }
    }

    DispatchedMessage brodcast(BroadcastMessage broadcastMessage, Area area, int i, Consumer<? super MessageHeader> consumer) {
        String name = MessageHelper.getName(broadcastMessage);
        Message broadcast = new Broadcast();
        broadcast.setBroadcastType(name);
        broadcast.setSenderId(this.info.getClientId().toString());
        broadcast.setSenderTimestamp(this.info.currentTime());
        Optional<PositionTime> currentPosition = this.info.getCurrentPosition();
        if (currentPosition.isPresent()) {
            broadcast.setSenderPosition(currentPosition.get());
        }
        Area area2 = area;
        if (area2 == null && currentPosition.isPresent()) {
            area2 = Circle.create(currentPosition.get(), i);
        }
        broadcast.setArea(area2);
        broadcast.setAckBroadcast(Boolean.valueOf(consumer != null));
        broadcast.setPayload(Binary.copyFromUtf8(MessageSerializer.writeToJSON(broadcastMessage, MessageHelper.getSerializer(broadcastMessage))));
        broadcast.setMessageId(MessageHasher.calculateSHA256(broadcast));
        DefaultAcknowledgement defaultAcknowledgement = new DefaultAcknowledgement();
        DispatchedBroadcast dispatchedBroadcast = new DispatchedBroadcast(broadcast, defaultAcknowledgement, consumer);
        this.sendLock.readLock().lock();
        try {
            checkNotShutdown();
            this.dispatchedBroadcasts.addMessage(dispatchedBroadcast);
            this.connection.sendMessage(broadcast).handle((r4, th) -> {
                if (th == null) {
                    defaultAcknowledgement.complete();
                    return null;
                }
                defaultAcknowledgement.completeExceptionally(th);
                return null;
            });
            this.sendLock.readLock().unlock();
            return dispatchedBroadcast;
        } catch (Throwable th2) {
            this.sendLock.readLock().unlock();
            throw th2;
        }
    }

    @ScheduleWithFixedDelay(value = 1, unit = TimeUnit.MINUTES)
    public void clearOldMessages() {
        this.dispatchedBroadcasts.pruneMessagesOldThan(System.nanoTime() - TimeUnit.NANOSECONDS.convert(1L, TimeUnit.HOURS));
    }

    private void onBroadcastAck(BroadcastAck broadcastAck) {
        DispatchedBroadcast find = this.dispatchedBroadcasts.find(broadcastAck.getAckForMessageId());
        if (find != null) {
            find.acked(broadcastAck);
        }
    }

    private void onBroadcastMessage(Broadcast broadcast) {
        SubscriptionSet subscriptionSet = this.subscribers.get(broadcast.getBroadcastType());
        if (subscriptionSet == null || subscriptionSet.listeners.isEmpty()) {
            return;
        }
        DefaultMessageHeader defaultMessageHeader = new DefaultMessageHeader(MaritimeId.create(broadcast.getSenderId()), broadcast.getMessageId(), broadcast.getSenderTimestamp(), broadcast.getSenderPosition());
        Iterator<SubscriptionSet.DefaultSubscription> it = subscriptionSet.listeners.iterator();
        while (it.hasNext()) {
            SubscriptionSet.DefaultSubscription next = it.next();
            try {
                BroadcastMessage convert = next.bd.convert(broadcast.getBroadcastType(), new JsonMessageReader(broadcast.getPayload().toStringUtf8()));
                this.threadManager.broadcastReceived(() -> {
                    next.deliver(defaultMessageHeader, convert);
                });
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
        }
    }

    private void checkNotShutdown() {
        if (this.isShutdown) {
            throw new MmsClientClosedException("The mms client has been shutdown");
        }
    }

    @RunOnStop
    public void stop() {
        this.subscribeLock.writeLock().lock();
        try {
            this.sendLock.writeLock().lock();
            try {
                this.isShutdown = true;
                MmsClientClosedException mmsClientClosedException = new MmsClientClosedException("Client has been shutdown");
                this.dispatchedBroadcasts.forEach(dispatchedBroadcast -> {
                    dispatchedBroadcast.shutdownClient(mmsClientClosedException);
                });
                this.dispatchedBroadcasts.clear();
                this.subscribers.clear();
                this.sendLock.writeLock().unlock();
            } catch (Throwable th) {
                this.sendLock.writeLock().unlock();
                throw th;
            }
        } finally {
            this.subscribeLock.writeLock().unlock();
        }
    }

    public void newSession() {
    }

    public DispatchedMessage broadcast(BroadcastMessage broadcastMessage, MmsBroadcastOptions mmsBroadcastOptions) {
        MmsBroadcastOptions immutable = mmsBroadcastOptions.immutable();
        return brodcast(broadcastMessage, immutable.getArea(), immutable.getRadius(), immutable.getRemoteReceive());
    }
}
