package com.oracle.coherence.concurrent.executor;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.concurrent.executor.Task;
import com.oracle.coherence.concurrent.executor.internal.ExecutorTrace;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/* loaded from: input_file:com/oracle/coherence/concurrent/executor/AbstractTaskCoordinator.class */
public abstract class AbstractTaskCoordinator<T> implements Task.Coordinator<T> {
    protected final ExecutorService f_executorService;
    protected final String f_sTaskId;
    protected final boolean f_fRetainTask;
    protected Set<Task.Subscriber<? super T>> m_setSubscribers = new CopyOnWriteArraySet();
    protected volatile Result<T> m_lastValue = Result.none();
    protected final AtomicBoolean f_cancelled = new AtomicBoolean(false);
    protected final AtomicBoolean f_closed = new AtomicBoolean(false);

    public AbstractTaskCoordinator(String str, ExecutorService executorService, boolean z) {
        this.f_sTaskId = str;
        this.f_fRetainTask = z;
        this.f_executorService = executorService;
    }

    @Override // com.oracle.coherence.concurrent.executor.Task.Coordinator
    public String getTaskId() {
        return this.f_sTaskId;
    }

    @Override // com.oracle.coherence.concurrent.executor.Task.Coordinator
    public boolean cancel(boolean z) {
        if (this.f_closed.get() || !this.f_cancelled.compareAndSet(false, true)) {
            return false;
        }
        close();
        return true;
    }

    @Override // com.oracle.coherence.concurrent.executor.Task.Coordinator
    public boolean isCancelled() {
        return this.f_cancelled.get();
    }

    @Override // com.oracle.coherence.concurrent.executor.Task.Coordinator
    public boolean isDone() {
        return this.f_closed.get();
    }

    @Override // com.oracle.coherence.concurrent.executor.Task.Coordinator
    public void subscribe(final Task.Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(new Task.Subscription<T>() { // from class: com.oracle.coherence.concurrent.executor.AbstractTaskCoordinator.1
            @Override // com.oracle.coherence.concurrent.executor.Task.Subscription
            public void cancel() {
                AbstractTaskCoordinator.this.m_setSubscribers.remove(subscriber);
            }

            @Override // com.oracle.coherence.concurrent.executor.Task.Subscription
            public Task.Coordinator<T> getCoordinator() {
                return AbstractTaskCoordinator.this;
            }
        });
        boolean z = true;
        if (this.f_closed.get()) {
            z = false;
            if (this.f_fRetainTask) {
                subscribeRetainedTask(subscriber);
            }
            Runnable runnable = () -> {
                if (this.m_lastValue != null) {
                    Throwable th = null;
                    T t = null;
                    try {
                        t = this.m_lastValue.get();
                    } catch (Throwable th2) {
                        th = th2;
                    }
                    try {
                        if (th == null) {
                            subscriber.onNext(t);
                        } else {
                            subscriber.onError(th);
                        }
                    } catch (Throwable th3) {
                    }
                }
                closeSubscriber(subscriber, false);
            };
            try {
                this.f_executorService.submit(runnable);
            } catch (RejectedExecutionException e) {
                runnable.run();
            }
        }
        if (z) {
            this.m_setSubscribers.add(subscriber);
        }
    }

    protected void closeSubscriber(Task.Subscriber<? super T> subscriber, boolean z) {
        ExecutorTrace.entering((Class<?>) AbstractTaskCoordinator.class, "closeSubscriber", subscriber, getTaskId());
        if (z) {
            ExecutorTrace.log((Supplier<String>) () -> {
                return String.format("Removing Subscriber %s", subscriber);
            });
            this.m_setSubscribers.remove(subscriber);
            ExecutorTrace.log((Supplier<String>) () -> {
                return String.format("Removed Subscriber %s", subscriber);
            });
        }
        try {
            if (this.f_cancelled.get()) {
                subscriber.onError(new InterruptedException("Task " + getTaskId() + " has been cancelled."));
            } else if (this.m_lastValue != null && this.m_lastValue.isValue()) {
                subscriber.onComplete();
            }
        } catch (Throwable th) {
            Logger.warn(() -> {
                return String.format("Failed to close subscriber %s", subscriber);
            });
            ExecutorTrace.throwing(AbstractTaskCoordinator.class, "closeSubscriber", th, new Object[0]);
            this.m_setSubscribers.remove(subscriber);
            try {
                subscriber.onError(th);
            } catch (Throwable th2) {
            }
        }
        ExecutorTrace.exiting(AbstractTaskCoordinator.class, "closeSubscriber");
    }

    public void close() {
        if (!this.f_closed.compareAndSet(false, true)) {
            ExecutorTrace.log("Skipped closing subscribers as the coordinator is already closed");
            return;
        }
        ExecutorTrace.log("Scheduling the closing of subscribers");
        Runnable runnable = () -> {
            Iterator<Task.Subscriber<? super T>> it = this.m_setSubscribers.iterator();
            while (it.hasNext()) {
                closeSubscriber(it.next(), true);
            }
        };
        try {
            this.f_executorService.submit(runnable);
        } catch (RejectedExecutionException e) {
            runnable.run();
        }
    }

    public boolean hasSubscribers() {
        return !this.m_setSubscribers.isEmpty();
    }

    public void offer(Result<T> result) {
        ExecutorTrace.entering((Class<?>) AbstractTaskCoordinator.class, "offer", result);
        if (!this.f_closed.get() && hasSubscribers()) {
            Runnable runnable = () -> {
                Object obj = null;
                Throwable th = null;
                try {
                    obj = result.get();
                } catch (Throwable th2) {
                    th = th2;
                }
                for (Task.Subscriber<? super T> subscriber : this.m_setSubscribers) {
                    if (th == null) {
                        try {
                            subscriber.onNext(obj);
                        } catch (Exception e) {
                            if (Logger.isEnabled(2)) {
                                Logger.warn(String.format("Task [%s]: removing subscriber [%s] as it threw an exception processing result: [%s]", getTaskId(), subscriber, result), e);
                            }
                            this.m_setSubscribers.remove(subscriber);
                        }
                    } else {
                        subscriber.onError(th);
                    }
                }
            };
            try {
                this.f_executorService.submit(runnable);
            } catch (RejectedExecutionException e) {
                runnable.run();
            }
        }
        ExecutorTrace.exiting(AbstractTaskCoordinator.class, "offer");
    }

    protected abstract void subscribeRetainedTask(Task.Subscriber<?> subscriber);
}
