/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.AbstractClusteringClient;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class AsyncClusteringService
extends HoodieAsyncService {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
    private final int maxConcurrentClustering;
    private transient AbstractClusteringClient clusteringClient;

    public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
        this(writeClient, false);
    }

    public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) {
        super(runInDaemonMode);
        this.clusteringClient = this.createClusteringClient(writeClient);
        this.maxConcurrentClustering = 1;
    }

    protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient var1);

    @Override
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService executor = Executors.newFixedThreadPool(this.maxConcurrentClustering, r -> {
            Thread t = new Thread(r, "async_clustering_thread");
            t.setDaemon(this.isRunInDaemonMode());
            return t;
        });
        return Pair.of(CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, this.maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                while (!this.isShutdownRequested()) {
                    HoodieInstant instant = this.fetchNextAsyncServiceInstant();
                    if (null == instant) continue;
                    LOG.info((Object)("Starting clustering for instant " + instant));
                    this.clusteringClient.cluster(instant);
                    LOG.info((Object)("Finished clustering for instant " + instant));
                }
                LOG.info((Object)"Clustering executor shutting down properly");
            }
            catch (InterruptedException ie) {
                LOG.warn((Object)"Clustering executor got interrupted exception! Stopping", (Throwable)ie);
            }
            catch (IOException e) {
                LOG.error((Object)"Clustering executor failed", (Throwable)e);
                throw new HoodieIOException(e.getMessage(), e);
            }
            return true;
        }, executor)).toArray(CompletableFuture[]::new)), (Object)executor);
    }

    public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {
        this.clusteringClient.updateWriteClient(writeClient);
    }
}

