package org.infinispan.container.versioning.irac;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Predicate;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.irac.IracTombstoneCleanupCommand;
import org.infinispan.commands.irac.IracTombstoneRemoteSiteCheckCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.irac.DefaultIracManager;
import org.infinispan.xsite.irac.IracExecutor;
import org.infinispan.xsite.irac.IracManager;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager.class */
public class DefaultIracTombstoneManager implements IracTombstoneManager {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static final int ACTION_COUNT = Action.values().length;
    private static final BiFunction<Set<Address>, List<Address>, Set<Address>> ADD_ALL_TO_SET = (set, list) -> {
        set.addAll(list);
        return set;
    };
    private static final BinaryOperator<Set<Address>> MERGE_SETS = (set, set2) -> {
        set.addAll(set2);
        return set;
    };
    private static final BiConsumer<Void, Throwable> TRACE_ROUND_COMPLETED = (r4, th) -> {
        if (th != null) {
            log.trace("[IRAC] Tombstone cleanup round failed!", th);
        } else {
            log.trace("[IRAC] Tombstone cleanup round finished!");
        }
    };

    @Inject
    DistributionManager distributionManager;

    @Inject
    RpcManager rpcManager;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    TakeOfflineManager takeOfflineManager;

    @Inject
    ComponentRef<IracManager> iracManager;

    @ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
    @Inject
    ScheduledExecutorService scheduledExecutorService;

    @Inject
    BlockingManager blockingManager;
    private final Map<Object, IracTombstoneInfo> tombstoneMap;
    private final Collection<XSiteBackup> asyncBackups;
    private final Scheduler scheduler;
    private final int batchSize;
    private final int segmentCount;
    private volatile boolean stopped = true;
    private final IracExecutor iracExecutor = new IracExecutor(this::performCleanup);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager$Action.class */
    public enum Action {
        KEEP_TOMBSTONE,
        REMOVE_TOMBSTONE,
        CHECK_REMOTE_SITE,
        NOTIFY_PRIMARY_OWNER
    }

    /* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager$CleanupTask.class */
    private final class CleanupTask implements Function<Void, CompletionStage<Void>>, Runnable {
        private final Collection<IracTombstoneInfo> tombstoneToCheck;
        private final IntSet tombstoneToKeep;
        private final int id;
        private volatile boolean failedToCheck;

        private CleanupTask(Collection<IracTombstoneInfo> collection) {
            this.tombstoneToCheck = collection;
            this.tombstoneToKeep = IntSets.concurrentSet(collection.size());
            this.failedToCheck = false;
            this.id = collection.hashCode();
        }

        Flowable<Void> check() {
            if (DefaultIracTombstoneManager.log.isTraceEnabled()) {
                DefaultIracTombstoneManager.log.tracef("[cleanup-task-%d] Running cleanup task with %s tombstones to check", this.id, this.tombstoneToCheck.size());
            }
            if (this.tombstoneToCheck.isEmpty()) {
                return Flowable.empty();
            }
            IracTombstoneRemoteSiteCheckCommand buildIracTombstoneRemoteSiteCheckCommand = DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneRemoteSiteCheckCommand((List) this.tombstoneToCheck.stream().map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList()));
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (XSiteBackup xSiteBackup : DefaultIracTombstoneManager.this.asyncBackups) {
                if (DefaultIracTombstoneManager.this.takeOfflineManager.getSiteState(xSiteBackup.getSiteName()) != SiteState.OFFLINE) {
                    aggregateCompletionStage.dependsOn(DefaultIracTombstoneManager.this.rpcManager.invokeXSite(xSiteBackup, buildIracTombstoneRemoteSiteCheckCommand).thenAccept(this::mergeIntSet));
                }
            }
            return RxJavaInterop.voidCompletionStageToFlowable(DefaultIracTombstoneManager.this.blockingManager.thenComposeBlocking(aggregateCompletionStage.freeze().exceptionally(this::onException), this, "tombstone-response"));
        }

        private void mergeIntSet(IntSet intSet) {
            if (DefaultIracTombstoneManager.log.isTraceEnabled()) {
                DefaultIracTombstoneManager.log.tracef("[cleanup-task-%d] Received response: %s", this.id, (Object) intSet);
            }
            this.tombstoneToKeep.addAll(intSet);
        }

        private Void onException(Throwable th) {
            if (DefaultIracTombstoneManager.log.isTraceEnabled()) {
                DefaultIracTombstoneManager.log.tracef(th, "[cleanup-task-%d] Received exception", this.id);
            }
            this.failedToCheck = true;
            return null;
        }

        @Override // java.util.function.Function
        public CompletionStage<Void> apply(Void r7) {
            IracTombstoneCleanupCommand buildIracTombstoneCleanupCommand = DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneCleanupCommand(this.tombstoneToCheck.size());
            Objects.requireNonNull(buildIracTombstoneCleanupCommand);
            forEachTombstoneToRemove(buildIracTombstoneCleanupCommand::add);
            if (DefaultIracTombstoneManager.log.isTraceEnabled()) {
                DefaultIracTombstoneManager.log.tracef("[cleanup-task-%d] Removing %d tombstones.", this.id, buildIracTombstoneCleanupCommand.getTombstonesToRemove().size());
            }
            if (buildIracTombstoneCleanupCommand.isEmpty()) {
                return CompletableFutures.completedNull();
            }
            return DefaultIracTombstoneManager.this.rpcManager.invokeCommand((Collection<Address>) this.tombstoneToCheck.stream().mapToInt((v0) -> {
                return v0.getSegment();
            }).distinct().mapToObj(i -> {
                return DefaultIracTombstoneManager.this.getSegmentDistribution(i).writeOwners();
            }).reduce(new HashSet(DefaultIracTombstoneManager.this.distributionManager.getCacheTopology().getMembers().size()), DefaultIracTombstoneManager.ADD_ALL_TO_SET, DefaultIracTombstoneManager.MERGE_SETS), buildIracTombstoneCleanupCommand, VoidResponseCollector.validOnly(), DefaultIracTombstoneManager.this.rpcManager.getSyncRpcOptions()).thenRun(this);
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultIracTombstoneManager defaultIracTombstoneManager = DefaultIracTombstoneManager.this;
            forEachTombstoneToRemove(defaultIracTombstoneManager::removeTombstone);
        }

        void forEachTombstoneToRemove(Consumer<IracTombstoneInfo> consumer) {
            if (this.failedToCheck) {
                return;
            }
            int i = 0;
            for (IracTombstoneInfo iracTombstoneInfo : this.tombstoneToCheck) {
                int i2 = i;
                i++;
                if (!this.tombstoneToKeep.contains(i2)) {
                    consumer.accept(iracTombstoneInfo);
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager$PrimaryOwnerCheckTask.class */
    private class PrimaryOwnerCheckTask {
        private final int segment;
        private final Collection<IracTombstoneInfo> tombstones;
        static final /* synthetic */ boolean $assertionsDisabled;

        private PrimaryOwnerCheckTask(int i, Collection<IracTombstoneInfo> collection) {
            this.segment = i;
            this.tombstones = collection;
            if (!$assertionsDisabled && !consistencyCheck()) {
                throw new AssertionError();
            }
        }

        Flowable<Void> check() {
            if (this.tombstones.isEmpty()) {
                return Flowable.empty();
            }
            return RxJavaInterop.voidCompletionStageToFlowable(DefaultIracTombstoneManager.this.rpcManager.invokeCommand(DefaultIracTombstoneManager.this.getSegmentDistribution(this.segment).primary(), DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstonePrimaryCheckCommand(this.tombstones), VoidResponseCollector.ignoreLeavers(), DefaultIracTombstoneManager.this.rpcManager.getSyncRpcOptions()));
        }

        private boolean consistencyCheck() {
            return this.tombstones.stream().allMatch(iracTombstoneInfo -> {
                return iracTombstoneInfo.getSegment() == this.segment;
            });
        }

        static {
            $assertionsDisabled = !DefaultIracTombstoneManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager$Scheduler.class */
    public final class Scheduler implements BiConsumer<Void, Throwable> {
        final int targetSize;
        final long maxDelayMillis;
        int preCleanupSize;
        int previousPostCleanupSize;
        long currentDelayMillis;
        volatile boolean running;
        volatile boolean disabled;

        @GuardedBy("this")
        ScheduledFuture<?> future;

        private Scheduler(int i, long j) {
            this.targetSize = i;
            this.maxDelayMillis = j;
            this.currentDelayMillis = j / 2;
        }

        void onTaskStarted(int i) {
            this.running = true;
            this.preCleanupSize = i;
        }

        void onTaskCompleted(int i) {
            if (i >= this.targetSize) {
                this.currentDelayMillis = 1L;
            } else {
                double d = ((this.preCleanupSize - this.previousPostCleanupSize) * 1.0d) / this.currentDelayMillis;
                this.currentDelayMillis = Math.round(Math.sqrt(this.currentDelayMillis * (d <= 0.0d ? this.maxDelayMillis : Math.min(((this.targetSize - i) / d) + 1.0d, this.maxDelayMillis))));
            }
            this.previousPostCleanupSize = i;
            scheduleWithCurrentDelay();
        }

        synchronized void scheduleWithCurrentDelay() {
            this.running = false;
            if (DefaultIracTombstoneManager.this.stopped || this.disabled) {
                return;
            }
            if (this.future != null) {
                this.future.cancel(true);
            }
            this.future = DefaultIracTombstoneManager.this.scheduledExecutorService.schedule(DefaultIracTombstoneManager.this.iracExecutor, this.currentDelayMillis, TimeUnit.MILLISECONDS);
        }

        synchronized void disable() {
            this.disabled = true;
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }
        }

        @Override // java.util.function.BiConsumer
        public void accept(Void r4, Throwable th) {
            onTaskCompleted(DefaultIracTombstoneManager.this.tombstoneMap.size());
        }
    }

    /* loaded from: input_file:org/infinispan/container/versioning/irac/DefaultIracTombstoneManager$StateTransferHelper.class */
    private class StateTransferHelper implements Predicate<IracTombstoneInfo>, io.reactivex.rxjava3.functions.Function<Collection<IracTombstoneInfo>, CompletableSource>, CompletableObserver {
        private final Address requestor;
        private final IntSet segments;

        private StateTransferHelper(Address address, IntSet intSet) {
            this.requestor = address;
            this.segments = intSet;
        }

        @Override // io.reactivex.rxjava3.functions.Predicate
        public boolean test(IracTombstoneInfo iracTombstoneInfo) {
            return this.segments.contains(iracTombstoneInfo.getSegment());
        }

        @Override // io.reactivex.rxjava3.functions.Function
        public CompletableSource apply(Collection<IracTombstoneInfo> collection) {
            RpcOptions syncRpcOptions = DefaultIracTombstoneManager.this.rpcManager.getSyncRpcOptions();
            return Completable.fromCompletionStage(DefaultIracTombstoneManager.this.rpcManager.invokeCommand(this.requestor, DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneStateResponseCommand(collection), VoidResponseCollector.ignoreLeavers(), syncRpcOptions));
        }

        @Override // io.reactivex.rxjava3.core.CompletableObserver
        public void onSubscribe(@NonNull Disposable disposable) {
        }

        @Override // io.reactivex.rxjava3.core.CompletableObserver
        public void onComplete() {
            if (DefaultIracTombstoneManager.log.isDebugEnabled()) {
                DefaultIracTombstoneManager.log.debugf("Tombstones transferred to %s for segments %s", this.requestor, this.segments);
            }
        }

        @Override // io.reactivex.rxjava3.core.CompletableObserver
        public void onError(@NonNull Throwable th) {
            DefaultIracTombstoneManager.log.failedToTransferTombstones(this.requestor, this.segments, th);
        }
    }

    public DefaultIracTombstoneManager(Configuration configuration) {
        this.asyncBackups = DefaultIracManager.asyncBackups(configuration);
        this.tombstoneMap = new ConcurrentHashMap(configuration.sites().tombstoneMapSize());
        this.scheduler = new Scheduler(configuration.sites().tombstoneMapSize(), configuration.sites().maxTombstoneCleanupDelay());
        this.batchSize = ((Integer) configuration.sites().asyncBackupsStream().map((v0) -> {
            return v0.stateTransfer();
        }).map((v0) -> {
            return v0.chunkSize();
        }).reduce(1, (v0, v1) -> {
            return Integer.max(v0, v1);
        })).intValue();
        this.segmentCount = configuration.clustering().hash().numSegments();
    }

    @Start
    public void start() {
        Transport transport = this.rpcManager.getTransport();
        transport.checkCrossSiteAvailable();
        String localSiteName = transport.localSiteName();
        this.asyncBackups.removeIf(xSiteBackup -> {
            return localSiteName.equals(xSiteBackup.getSiteName());
        });
        this.iracExecutor.setBackOff(ExponentialBackOff.NO_OP);
        this.iracExecutor.setExecutor(this.blockingManager.asExecutor(this.commandsFactory.getCacheName() + "-tombstone-cleanup"));
        this.stopped = false;
        this.scheduler.disabled = false;
        this.scheduler.scheduleWithCurrentDelay();
    }

    @Stop
    public void stop() {
        this.stopped = true;
        stopCleanupTask();
        this.tombstoneMap.clear();
    }

    public void stopCleanupTask() {
        this.scheduler.disable();
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void storeTombstone(int i, Object obj, IracMetadata iracMetadata) {
        IracTombstoneInfo iracTombstoneInfo = new IracTombstoneInfo(obj, i, iracMetadata);
        this.tombstoneMap.put(obj, iracTombstoneInfo);
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Tombstone stored: %s", iracTombstoneInfo);
        }
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void storeTombstoneIfAbsent(IracTombstoneInfo iracTombstoneInfo) {
        if (iracTombstoneInfo == null) {
            return;
        }
        boolean z = this.tombstoneMap.putIfAbsent(iracTombstoneInfo.getKey(), iracTombstoneInfo) == null;
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Tombstone stored? %s. %s", Boolean.valueOf(z), iracTombstoneInfo);
        }
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public IracMetadata getTombstone(Object obj) {
        IracTombstoneInfo iracTombstoneInfo = this.tombstoneMap.get(obj);
        if (iracTombstoneInfo == null) {
            return null;
        }
        return iracTombstoneInfo.getMetadata();
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void removeTombstone(IracTombstoneInfo iracTombstoneInfo) {
        if (iracTombstoneInfo == null) {
            return;
        }
        boolean remove = this.tombstoneMap.remove(iracTombstoneInfo.getKey(), iracTombstoneInfo);
        if (log.isTraceEnabled()) {
            log.tracef("[IRAC] Tombstone removed? %s. %s", Boolean.valueOf(remove), iracTombstoneInfo);
        }
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void removeTombstone(Object obj) {
        IracTombstoneInfo remove = this.tombstoneMap.remove(obj);
        if (remove == null || !log.isTraceEnabled()) {
            return;
        }
        log.tracef("[IRAC] Tombstone removed %s", remove);
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public boolean isEmpty() {
        return this.tombstoneMap.isEmpty();
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public int size() {
        return this.tombstoneMap.size();
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public boolean isTaskRunning() {
        return this.scheduler.running;
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public long getCurrentDelayMillis() {
        return this.scheduler.currentDelayMillis;
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void sendStateTo(Address address, IntSet intSet) {
        StateTransferHelper stateTransferHelper = new StateTransferHelper(address, intSet);
        Flowable.fromIterable(this.tombstoneMap.values()).filter(stateTransferHelper).buffer(this.batchSize).concatMapCompletableDelayError(stateTransferHelper).subscribe(stateTransferHelper);
    }

    @Override // org.infinispan.container.versioning.irac.IracTombstoneManager
    public void checkStaleTombstone(Collection<? extends IracTombstoneInfo> collection) {
        boolean isTraceEnabled = log.isTraceEnabled();
        if (isTraceEnabled) {
            log.tracef("[IRAC] Checking for stale tombstones from backup owner. %s", collection);
        }
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        IntSet mutableEmptySet = IntSets.mutableEmptySet(this.segmentCount);
        IracTombstoneCleanupCommand buildIracTombstoneCleanupCommand = this.commandsFactory.buildIracTombstoneCleanupCommand(collection.size());
        for (IracTombstoneInfo iracTombstoneInfo : collection) {
            IracTombstoneInfo iracTombstoneInfo2 = this.tombstoneMap.get(iracTombstoneInfo.getKey());
            if (cacheTopology.getSegmentDistribution(iracTombstoneInfo.getSegment()).isPrimary() && !iracTombstoneInfo.equals(iracTombstoneInfo2)) {
                mutableEmptySet.add(iracTombstoneInfo.getSegment());
                buildIracTombstoneCleanupCommand.add(iracTombstoneInfo);
            }
        }
        if (buildIracTombstoneCleanupCommand.isEmpty()) {
            if (isTraceEnabled) {
                log.trace("[IRAC] Nothing to send.");
            }
        } else {
            Collection<Address> collection2 = (Collection) mutableEmptySet.intStream().mapToObj(i -> {
                return getSegmentDistribution(i).writeOwners();
            }).reduce(new HashSet(this.distributionManager.getCacheTopology().getMembers().size()), ADD_ALL_TO_SET, MERGE_SETS);
            if (isTraceEnabled) {
                log.tracef("[IRAC] Cleaning up %d tombstones: %s", buildIracTombstoneCleanupCommand.getTombstonesToRemove().size(), (Object) buildIracTombstoneCleanupCommand.getTombstonesToRemove());
            }
            this.rpcManager.sendToMany(collection2, buildIracTombstoneCleanupCommand, DeliverOrder.NONE);
        }
    }

    public void startCleanupTombstone() {
        this.iracExecutor.run();
    }

    public void runCleanupAndWait() {
        performCleanup().toCompletableFuture().join();
    }

    public boolean contains(IracTombstoneInfo iracTombstoneInfo) {
        return iracTombstoneInfo.equals(this.tombstoneMap.get(iracTombstoneInfo.getKey()));
    }

    private CompletionStage<Void> performCleanup() {
        if (this.stopped) {
            return CompletableFutures.completedNull();
        }
        boolean isTraceEnabled = log.isTraceEnabled();
        if (isTraceEnabled) {
            log.trace("[IRAC] Starting tombstone cleanup round.");
        }
        this.scheduler.onTaskStarted(this.tombstoneMap.size());
        CompletionStage lastStage = Flowable.fromIterable(this.tombstoneMap.values()).groupBy(this::classifyTombstone).flatMap(groupedFlowable -> {
            switch ((Action) groupedFlowable.getKey()) {
                case REMOVE_TOMBSTONE:
                    return removeAllTombstones(groupedFlowable);
                case NOTIFY_PRIMARY_OWNER:
                    return notifyPrimaryOwner(groupedFlowable);
                case CHECK_REMOTE_SITE:
                    return checkRemoteSite(groupedFlowable);
                case KEEP_TOMBSTONE:
                default:
                    return Flowable.empty();
            }
        }, true, ACTION_COUNT, ACTION_COUNT).lastStage(null);
        if (isTraceEnabled) {
            lastStage = lastStage.whenComplete(TRACE_ROUND_COMPLETED);
        }
        return lastStage.whenComplete(this.scheduler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DistributionInfo getSegmentDistribution(int i) {
        return this.distributionManager.getCacheTopology().getSegmentDistribution(i);
    }

    private Flowable<Void> removeAllTombstones(Flowable<IracTombstoneInfo> flowable) {
        return flowable.concatMapDelayError(iracTombstoneInfo -> {
            try {
                removeTombstone(iracTombstoneInfo);
                return Flowable.empty();
            } catch (Throwable th) {
                return Flowable.error(th);
            }
        });
    }

    private Flowable<Void> notifyPrimaryOwner(Flowable<IracTombstoneInfo> flowable) {
        return flowable.groupBy((v0) -> {
            return v0.getSegment();
        }).concatMapEagerDelayError(groupedFlowable -> {
            return groupedFlowable.buffer(this.batchSize).concatMapDelayError(list -> {
                return new PrimaryOwnerCheckTask(((Integer) groupedFlowable.getKey()).intValue(), list).check();
            });
        }, true, this.segmentCount, this.segmentCount);
    }

    private Flowable<Void> checkRemoteSite(Flowable<IracTombstoneInfo> flowable) {
        return flowable.buffer(this.batchSize).concatMapDelayError(list -> {
            return new CleanupTask(list).check();
        });
    }

    private Action classifyTombstone(IracTombstoneInfo iracTombstoneInfo) {
        DistributionInfo segmentDistribution = getSegmentDistribution(iracTombstoneInfo.getSegment());
        return (segmentDistribution.isWriteOwner() || segmentDistribution.isReadOwner()) ? !segmentDistribution.isPrimary() ? this.iracManager.running().containsKey(iracTombstoneInfo.getKey()) ? Action.KEEP_TOMBSTONE : Action.NOTIFY_PRIMARY_OWNER : this.iracManager.running().containsKey(iracTombstoneInfo.getKey()) ? Action.KEEP_TOMBSTONE : Action.CHECK_REMOTE_SITE : Action.REMOVE_TOMBSTONE;
    }
}
