package cloud.orbit.actors.streams.simple;

import cloud.orbit.actors.Addressable;
import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.runtime.AbstractActor;
import cloud.orbit.actors.runtime.ActorRuntime;
import cloud.orbit.actors.transactions.IdUtils;
import cloud.orbit.concurrent.Task;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

/* loaded from: input_file:cloud/orbit/actors/streams/simple/SimpleStreamActor.class */
public class SimpleStreamActor extends AbstractActor<State> implements SimpleStream {

    /* loaded from: input_file:cloud/orbit/actors/streams/simple/SimpleStreamActor$State.class */
    public static class State {
        ConcurrentHashMap<String, SimpleStreamProxy> subscribers = new ConcurrentHashMap<>();
    }

    @Override // cloud.orbit.actors.streams.simple.SimpleStream
    public Task<Void> unsubscribe(String str) {
        if (((State) state()).subscribers.remove(str) != null) {
            writeState();
        }
        return Task.done();
    }

    @Override // cloud.orbit.actors.streams.simple.SimpleStream
    public Task<String> subscribe(SimpleStreamProxy simpleStreamProxy) {
        String urlSafeString = IdUtils.urlSafeString(96);
        ((State) state()).subscribers.put(urlSafeString, simpleStreamProxy);
        writeState();
        return Task.fromValue(urlSafeString);
    }

    @Override // cloud.orbit.actors.streams.simple.SimpleStream
    public <T> Task<Void> publish(T t) {
        return Task.allOf(((State) state()).subscribers.entrySet().stream().map(entry -> {
            return ((SimpleStreamProxy) entry.getValue()).onNext(t, null).exceptionally(obj -> {
                checkAlive((String) entry.getKey(), (SimpleStreamProxy) entry.getValue());
                return null;
            });
        }));
    }

    private static /* synthetic */ Task lambda$checkCast$cloud_orbit_concurrent_Task(CompletionStage completionStage) {
        if (completionStage instanceof Task) {
            return (Task) completionStage;
        }
        Task task = new Task();
        completionStage.whenComplete((obj, th) -> {
            if (th != null) {
                task.completeExceptionally(th);
            } else {
                task.complete(obj);
            }
        });
        return task;
    }

    public Task<?> activateAsync() {
        if (getLogger().isDebugEnabled()) {
            getLogger().debug("Activating stream: {}", actorIdentity());
        }
        Task activateAsync = super.activateAsync();
        if (!activateAsync.toCompletableFuture().isDone()) {
            int i = 0;
            int i2 = 1;
            return lambda$checkCast$cloud_orbit_concurrent_Task(activateAsync.exceptionally(Function.identity()).thenCompose((v4) -> {
                return async$activateAsync(r1, r2, r3, r4, v4);
            }).toCompletableFuture());
        }
        activateAsync.toCompletableFuture().join();
        int size = ((State) state()).subscribers.size();
        Task allOf = Task.allOf(((State) state()).subscribers.entrySet().stream().map(entry -> {
            return checkAlive((String) entry.getKey(), (SimpleStreamProxy) entry.getValue());
        }));
        if (allOf.toCompletableFuture().isDone()) {
            allOf.toCompletableFuture().join();
            return size != ((State) state()).subscribers.size() ? writeState() : Task.done();
        }
        int i3 = 2;
        return lambda$checkCast$cloud_orbit_concurrent_Task(allOf.exceptionally(Function.identity()).thenCompose((v4) -> {
            return async$activateAsync(r1, r2, r3, r4, v4);
        }).toCompletableFuture());
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x0001. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:13:0x00aa  */
    /* JADX WARN: Removed duplicated region for block: B:17:0x00e4  */
    /* JADX WARN: Removed duplicated region for block: B:19:0x00e9  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static java.util.concurrent.CompletableFuture async$activateAsync(cloud.orbit.actors.streams.simple.SimpleStreamActor r20, cloud.orbit.concurrent.Task r21, int r22, int r23, java.lang.Object r24) {
        /*
            r0 = r23
            switch(r0) {
                case 0: goto L1c;
                case 1: goto Led;
                case 2: goto Lf1;
                default: goto Lf7;
            }
        L1c:
            r0 = r20
            org.slf4j.Logger r0 = r0.getLogger()
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto L37
            r0 = r20
            org.slf4j.Logger r0 = r0.getLogger()
            java.lang.String r1 = "Activating stream: {}"
            r2 = r20
            java.lang.String r2 = r2.actorIdentity()
            r0.debug(r1, r2)
        L37:
            r0 = r20
            cloud.orbit.concurrent.Task r0 = super.activateAsync()
            r1 = r0
            java.util.concurrent.CompletableFuture r1 = r1.toCompletableFuture()
            boolean r1 = r1.isDone()
            if (r1 != 0) goto L67
            r22 = r0
            r0 = r22
            java.util.function.Function r1 = java.util.function.Function.identity()
            java.util.concurrent.CompletionStage r0 = r0.exceptionally(r1)
            r1 = r20
            r2 = r22
            r3 = 0
            r4 = 1
            java.util.concurrent.CompletableFuture r1 = (v4) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return async$activateAsync(r1, r2, r3, r4, v4);
            }
            java.util.concurrent.CompletionStage r0 = r0.thenCompose(r1)
            java.util.concurrent.CompletableFuture r0 = r0.toCompletableFuture()
            return r0
        L67:
            java.util.concurrent.CompletableFuture r0 = r0.toCompletableFuture()
            java.lang.Object r0 = r0.join()
            r0 = r20
            java.lang.Object r0 = r0.state()
            cloud.orbit.actors.streams.simple.SimpleStreamActor$State r0 = (cloud.orbit.actors.streams.simple.SimpleStreamActor.State) r0
            java.util.concurrent.ConcurrentHashMap<java.lang.String, cloud.orbit.actors.streams.simple.SimpleStreamProxy> r0 = r0.subscribers
            int r0 = r0.size()
            r21 = r0
            r0 = r20
            java.lang.Object r0 = r0.state()
            cloud.orbit.actors.streams.simple.SimpleStreamActor$State r0 = (cloud.orbit.actors.streams.simple.SimpleStreamActor.State) r0
            java.util.concurrent.ConcurrentHashMap<java.lang.String, cloud.orbit.actors.streams.simple.SimpleStreamProxy> r0 = r0.subscribers
            java.util.Set r0 = r0.entrySet()
            java.util.stream.Stream r0 = r0.stream()
            r1 = r20
            java.util.concurrent.CompletableFuture r1 = (v1) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return r1.lambda$activateAsync$0(v1);
            }
            java.util.stream.Stream r0 = r0.map(r1)
            cloud.orbit.concurrent.Task r0 = cloud.orbit.concurrent.Task.allOf(r0)
            r1 = r0
            java.util.concurrent.CompletableFuture r1 = r1.toCompletableFuture()
            boolean r1 = r1.isDone()
            if (r1 != 0) goto Lca
            r22 = r0
            r0 = r22
            java.util.function.Function r1 = java.util.function.Function.identity()
            java.util.concurrent.CompletionStage r0 = r0.exceptionally(r1)
            r1 = r20
            r2 = r22
            r3 = r21
            r4 = 2
            java.util.concurrent.CompletableFuture r1 = (v4) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return async$activateAsync(r1, r2, r3, r4, v4);
            }
            java.util.concurrent.CompletionStage r0 = r0.thenCompose(r1)
            java.util.concurrent.CompletableFuture r0 = r0.toCompletableFuture()
            return r0
        Lca:
            java.util.concurrent.CompletableFuture r0 = r0.toCompletableFuture()
            java.lang.Object r0 = r0.join()
            r0 = r21
            r1 = r20
            java.lang.Object r1 = r1.state()
            cloud.orbit.actors.streams.simple.SimpleStreamActor$State r1 = (cloud.orbit.actors.streams.simple.SimpleStreamActor.State) r1
            java.util.concurrent.ConcurrentHashMap<java.lang.String, cloud.orbit.actors.streams.simple.SimpleStreamProxy> r1 = r1.subscribers
            int r1 = r1.size()
            if (r0 == r1) goto Le9
            r0 = r20
            cloud.orbit.concurrent.Task r0 = r0.writeState()
            return r0
        Le9:
            cloud.orbit.concurrent.Task r0 = cloud.orbit.concurrent.Task.done()
            return r0
        Led:
            r0 = r21
            goto L67
        Lf1:
            r0 = r21
            r1 = r22
            r21 = r1
            goto Lca
        Lf7:
            java.lang.IllegalArgumentException r0 = new java.lang.IllegalArgumentException
            r1 = r0
            r1.<init>()
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: cloud.orbit.actors.streams.simple.SimpleStreamActor.async$activateAsync(cloud.orbit.actors.streams.simple.SimpleStreamActor, cloud.orbit.concurrent.Task, int, int, java.lang.Object):java.util.concurrent.CompletableFuture");
    }

    private Task<Boolean> checkAlive(String str, SimpleStreamProxy simpleStreamProxy) {
        ActorRuntime runtime = ActorRuntime.getRuntime();
        Task locateActor = runtime.locateActor((Addressable) simpleStreamProxy, false);
        if (!locateActor.toCompletableFuture().isDone()) {
            int i = 1;
            return lambda$checkCast$cloud_orbit_concurrent_Task(locateActor.exceptionally(Function.identity()).thenCompose((v6) -> {
                return async$checkAlive(r1, r2, r3, r4, r5, r6, v6);
            }).toCompletableFuture());
        }
        if (((NodeAddress) locateActor.toCompletableFuture().join()) != null) {
            return Task.fromValue(Boolean.TRUE);
        }
        ((State) state()).subscribers.remove(str);
        return Task.fromValue(Boolean.FALSE);
    }

    private static CompletableFuture async$checkAlive(SimpleStreamActor simpleStreamActor, String str, SimpleStreamProxy simpleStreamProxy, ActorRuntime actorRuntime, Task task, int i, Object obj) {
        Task task2;
        switch (i) {
            case 0:
                ActorRuntime runtime = ActorRuntime.getRuntime();
                task2 = runtime.locateActor((Addressable) simpleStreamProxy, false);
                if (!task2.toCompletableFuture().isDone()) {
                    int i2 = 1;
                    return task2.exceptionally(Function.identity()).thenCompose((v6) -> {
                        return async$checkAlive(r1, r2, r3, r4, r5, r6, v6);
                    }).toCompletableFuture();
                }
                break;
            case 1:
                task2 = task;
                break;
            default:
                throw new IllegalArgumentException();
        }
        if (((NodeAddress) task2.toCompletableFuture().join()) != null) {
            return Task.fromValue(Boolean.TRUE);
        }
        ((State) simpleStreamActor.state()).subscribers.remove(str);
        return Task.fromValue(Boolean.FALSE);
    }
}
