package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.CancellationTokenSource;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverCloseReason;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.LeaseRenewer;
import com.azure.data.cosmos.internal.changefeed.PartitionProcessor;
import com.azure.data.cosmos.internal.changefeed.PartitionSupervisor;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseLostException;
import com.azure.data.cosmos.internal.changefeed.exceptions.ObserverException;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionSplitException;
import com.azure.data.cosmos.internal.changefeed.exceptions.TaskCancelledException;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionSupervisorImpl.class */
class PartitionSupervisorImpl implements PartitionSupervisor, Closeable {
    private final Lease lease;
    private final ChangeFeedObserver observer;
    private final PartitionProcessor processor;
    private final LeaseRenewer renewer;
    private CancellationTokenSource renewerCancellation;
    private CancellationTokenSource processorCancellation;
    private RuntimeException resultException;
    private ExecutorService executorService;

    public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver changeFeedObserver, PartitionProcessor partitionProcessor, LeaseRenewer leaseRenewer, ExecutorService executorService) {
        this.lease = lease;
        this.observer = changeFeedObserver;
        this.processor = partitionProcessor;
        this.renewer = leaseRenewer;
        this.executorService = executorService;
        if (executorService == null) {
            this.executorService = Executors.newFixedThreadPool(3);
        }
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionSupervisor
    public Mono<Void> run(CancellationToken cancellationToken) {
        this.resultException = null;
        ChangeFeedObserverContextImpl changeFeedObserverContextImpl = new ChangeFeedObserverContextImpl(this.lease.getLeaseToken());
        this.observer.open(changeFeedObserverContextImpl);
        ChangeFeedObserverCloseReason changeFeedObserverCloseReason = ChangeFeedObserverCloseReason.UNKNOWN;
        try {
            try {
                try {
                    try {
                        try {
                            this.processorCancellation = new CancellationTokenSource();
                            Thread thread = new Thread(new Runnable() { // from class: com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    this.processor.run(this.processorCancellation.getToken()).block();
                                }
                            });
                            this.renewerCancellation = new CancellationTokenSource();
                            Thread thread2 = new Thread(new Runnable() { // from class: com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.2
                                @Override // java.lang.Runnable
                                public void run() {
                                    this.renewer.run(this.renewerCancellation.getToken()).block();
                                }
                            });
                            this.executorService.execute(thread);
                            this.executorService.execute(thread2);
                            while (!cancellationToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null) {
                                try {
                                    Thread.sleep(100L);
                                } catch (InterruptedException e) {
                                }
                            }
                            this.processorCancellation.cancel();
                            this.renewerCancellation.cancel();
                        } catch (LeaseLostException e2) {
                            ChangeFeedObserverCloseReason changeFeedObserverCloseReason2 = ChangeFeedObserverCloseReason.LEASE_LOST;
                            this.resultException = e2;
                            this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason2);
                        }
                    } catch (PartitionSplitException e3) {
                        ChangeFeedObserverCloseReason changeFeedObserverCloseReason3 = ChangeFeedObserverCloseReason.LEASE_GONE;
                        this.resultException = e3;
                        this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason3);
                    }
                } catch (ObserverException e4) {
                    ChangeFeedObserverCloseReason changeFeedObserverCloseReason4 = ChangeFeedObserverCloseReason.OBSERVER_ERROR;
                    this.resultException = e4;
                    this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason4);
                }
            } catch (TaskCancelledException e5) {
                ChangeFeedObserverCloseReason changeFeedObserverCloseReason5 = ChangeFeedObserverCloseReason.SHUTDOWN;
                this.resultException = null;
                this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason5);
            } catch (Exception e6) {
                this.observer.close(changeFeedObserverContextImpl, ChangeFeedObserverCloseReason.UNKNOWN);
            }
            if (this.processor.getResultException() != null) {
                throw this.processor.getResultException();
            }
            if (this.renewer.getResultException() != null) {
                throw this.renewer.getResultException();
            }
            this.observer.close(changeFeedObserverContextImpl, cancellationToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN);
            return this.resultException != null ? Mono.error(this.resultException) : Mono.empty();
        } catch (Throwable th) {
            this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason);
            throw th;
        }
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionSupervisor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.processorCancellation != null) {
            this.processorCancellation.close();
        }
        this.renewerCancellation.close();
    }
}
