package com.microsoft.azure.cosmosdb.changefeedprocessor.internal;

import com.microsoft.azure.cosmosdb.changefeedprocessor.CancellationToken;
import com.microsoft.azure.cosmosdb.changefeedprocessor.CancellationTokenSource;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedObserver;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedObserverCloseReason;
import com.microsoft.azure.cosmosdb.changefeedprocessor.Lease;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseRenewer;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionProcessor;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSupervisor;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.LeaseLostException;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.ObserverException;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.PartitionSplitException;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.TaskCancelledException;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import rx.Completable;
import rx.Observable;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/changefeedprocessor/internal/PartitionSupervisorImpl.class */
public class PartitionSupervisorImpl implements PartitionSupervisor, Closeable {
    private final Lease lease;
    private final ChangeFeedObserver observer;
    private final PartitionProcessor processor;
    private final LeaseRenewer renewer;
    private CancellationTokenSource renewerCancellation;
    private CancellationTokenSource processorCancellation;
    private RuntimeException resultException;
    private ExecutorService executorService = Executors.newFixedThreadPool(3);

    public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver changeFeedObserver, PartitionProcessor partitionProcessor, LeaseRenewer leaseRenewer, ExecutorService executorService) {
        this.lease = lease;
        this.observer = changeFeedObserver;
        this.processor = partitionProcessor;
        this.renewer = leaseRenewer;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSupervisor
    public Completable runAsync(CancellationToken cancellationToken) {
        this.resultException = null;
        return Completable.fromAction(() -> {
            ChangeFeedObserverContextImpl changeFeedObserverContextImpl = new ChangeFeedObserverContextImpl(this.lease.getLeaseToken());
            this.observer.open(changeFeedObserverContextImpl);
            ChangeFeedObserverCloseReason changeFeedObserverCloseReason = ChangeFeedObserverCloseReason.UNKNOWN;
            try {
                try {
                    try {
                        try {
                            try {
                                try {
                                    this.processorCancellation = new CancellationTokenSource();
                                    Thread thread = new Thread(new Runnable() { // from class: com.microsoft.azure.cosmosdb.changefeedprocessor.internal.PartitionSupervisorImpl.1
                                        @Override // java.lang.Runnable
                                        public void run() {
                                            this.processor.runAsync(this.processorCancellation.getToken()).await();
                                        }
                                    });
                                    this.renewerCancellation = new CancellationTokenSource();
                                    Thread thread2 = new Thread(new Runnable() { // from class: com.microsoft.azure.cosmosdb.changefeedprocessor.internal.PartitionSupervisorImpl.2
                                        @Override // java.lang.Runnable
                                        public void run() {
                                            this.renewer.runAsync(this.renewerCancellation.getToken()).await();
                                        }
                                    });
                                    this.executorService.execute(thread);
                                    this.executorService.execute(thread2);
                                    while (!cancellationToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null) {
                                        try {
                                            Thread.sleep(100L);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    this.processorCancellation.cancel();
                                    this.renewerCancellation.cancel();
                                    this.executorService.shutdown();
                                    if (this.processor.getResultException() != null) {
                                        throw this.processor.getResultException();
                                    }
                                    if (this.renewer.getResultException() != null) {
                                        throw this.renewer.getResultException();
                                    }
                                    this.observer.close(changeFeedObserverContextImpl, cancellationToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN);
                                    if (this.resultException != null) {
                                        Observable.error(this.resultException);
                                    }
                                } catch (Exception e2) {
                                    this.observer.close(changeFeedObserverContextImpl, ChangeFeedObserverCloseReason.UNKNOWN);
                                    if (this.resultException != null) {
                                        Observable.error(this.resultException);
                                    }
                                }
                            } catch (TaskCancelledException e3) {
                                ChangeFeedObserverCloseReason changeFeedObserverCloseReason2 = ChangeFeedObserverCloseReason.SHUTDOWN;
                                this.resultException = null;
                                this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason2);
                                if (this.resultException != null) {
                                    Observable.error(this.resultException);
                                }
                            }
                        } catch (ObserverException e4) {
                            ChangeFeedObserverCloseReason changeFeedObserverCloseReason3 = ChangeFeedObserverCloseReason.OBSERVER_ERROR;
                            this.resultException = e4;
                            this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason3);
                            if (this.resultException != null) {
                                Observable.error(this.resultException);
                            }
                        }
                    } catch (LeaseLostException e5) {
                        ChangeFeedObserverCloseReason changeFeedObserverCloseReason4 = ChangeFeedObserverCloseReason.LEASE_LOST;
                        this.resultException = e5;
                        this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason4);
                        if (this.resultException != null) {
                            Observable.error(this.resultException);
                        }
                    }
                } catch (PartitionSplitException e6) {
                    ChangeFeedObserverCloseReason changeFeedObserverCloseReason5 = ChangeFeedObserverCloseReason.LEASE_GONE;
                    this.resultException = e6;
                    this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason5);
                    if (this.resultException != null) {
                        Observable.error(this.resultException);
                    }
                }
            } catch (Throwable th) {
                this.observer.close(changeFeedObserverContextImpl, changeFeedObserverCloseReason);
                if (this.resultException != null) {
                    Observable.error(this.resultException);
                }
                throw th;
            }
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSupervisor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.processorCancellation != null) {
            this.processorCancellation.close();
        }
        this.renewerCancellation.close();
    }
}
