package org.apache.hadoop.ozone.recon.tasks;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.class */
public class ReconTaskControllerImpl implements ReconTaskController {
    private static final Logger LOG = LoggerFactory.getLogger(ReconTaskControllerImpl.class);
    private ExecutorService executorService;
    private final int threadCount;
    private static final int TASK_FAILURE_THRESHOLD = 2;
    private ReconTaskStatusDao reconTaskStatusDao;
    private Map<String, AtomicInteger> taskFailureCounter = new HashMap();
    private Map<String, ReconOmTask> reconOmTasks = new HashMap();

    @Inject
    public ReconTaskControllerImpl(OzoneConfiguration ozoneConfiguration, ReconTaskStatusDao reconTaskStatusDao, Set<ReconOmTask> set) {
        this.threadCount = ozoneConfiguration.getInt(ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY, 5);
        this.reconTaskStatusDao = reconTaskStatusDao;
        Iterator<ReconOmTask> it = set.iterator();
        while (it.hasNext()) {
            registerTask(it.next());
        }
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public void registerTask(ReconOmTask reconOmTask) {
        String taskName = reconOmTask.getTaskName();
        LOG.info("Registered task {} with controller.", taskName);
        this.reconOmTasks.put(taskName, reconOmTask);
        this.taskFailureCounter.put(taskName, new AtomicInteger(0));
        ReconTaskStatus reconTaskStatus = new ReconTaskStatus(taskName, 0L, 0L);
        if (this.reconTaskStatusDao.existsById(taskName)) {
            return;
        }
        this.reconTaskStatusDao.insert(reconTaskStatus);
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public synchronized void consumeOMEvents(OMUpdateEventBatch oMUpdateEventBatch, OMMetadataManager oMMetadataManager) throws InterruptedException {
        try {
            if (!oMUpdateEventBatch.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                Iterator<Map.Entry<String, ReconOmTask>> it = this.reconOmTasks.entrySet().iterator();
                while (it.hasNext()) {
                    ReconOmTask value = it.next().getValue();
                    arrayList.add(() -> {
                        return value.process(oMUpdateEventBatch);
                    });
                }
                List<String> processTaskResults = processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch);
                List<String> arrayList2 = new ArrayList();
                if (!processTaskResults.isEmpty()) {
                    arrayList.clear();
                    Iterator<String> it2 = processTaskResults.iterator();
                    while (it2.hasNext()) {
                        ReconOmTask reconOmTask = this.reconOmTasks.get(it2.next());
                        arrayList.add(() -> {
                            return reconOmTask.process(oMUpdateEventBatch);
                        });
                    }
                    arrayList2 = processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch);
                }
                if (!arrayList2.isEmpty()) {
                    arrayList.clear();
                    Iterator<String> it3 = processTaskResults.iterator();
                    while (it3.hasNext()) {
                        ReconOmTask reconOmTask2 = this.reconOmTasks.get(it3.next());
                        arrayList.add(() -> {
                            return reconOmTask2.reprocess(oMMetadataManager);
                        });
                    }
                    ignoreFailedTasks(processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch));
                }
            }
        } catch (ExecutionException e) {
            LOG.error("Unexpected error : ", e);
        }
    }

    private void ignoreFailedTasks(List<String> list) {
        for (String str : list) {
            LOG.info("Reprocess step failed for task {}.", str);
            if (this.taskFailureCounter.get(str).incrementAndGet() > TASK_FAILURE_THRESHOLD) {
                LOG.info("Ignoring task since it failed retry and reprocess more than {} times.", Integer.valueOf(TASK_FAILURE_THRESHOLD));
                this.reconOmTasks.remove(str);
            }
        }
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public synchronized void reInitializeTasks(ReconOMMetadataManager reconOMMetadataManager) throws InterruptedException {
        try {
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<String, ReconOmTask>> it = this.reconOmTasks.entrySet().iterator();
            while (it.hasNext()) {
                ReconOmTask value = it.next().getValue();
                arrayList.add(() -> {
                    return value.reprocess(reconOMMetadataManager);
                });
            }
            for (Future future : this.executorService.invokeAll(arrayList)) {
                String str = (String) ((Pair) future.get()).getLeft();
                if (((Boolean) ((Pair) future.get()).getRight()).booleanValue()) {
                    this.reconTaskStatusDao.update(new ReconTaskStatus(str, Long.valueOf(System.currentTimeMillis()), Long.valueOf(reconOMMetadataManager.getLastSequenceNumberFromDB())));
                } else {
                    LOG.info("Init failed for task {}.", str);
                }
            }
        } catch (ExecutionException e) {
            LOG.error("Unexpected error : ", e);
        }
    }

    private void storeLastCompletedTransaction(String str, long j) {
        this.reconTaskStatusDao.update(new ReconTaskStatus(str, Long.valueOf(System.currentTimeMillis()), Long.valueOf(j)));
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public Map<String, ReconOmTask> getRegisteredTasks() {
        return this.reconOmTasks;
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public ReconTaskStatusDao getReconTaskStatusDao() {
        return this.reconTaskStatusDao;
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public synchronized void start() {
        LOG.info("Starting Recon Task Controller.");
        this.executorService = Executors.newFixedThreadPool(this.threadCount);
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public synchronized void stop() {
        LOG.info("Stopping Recon Task Controller.");
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    private List<String> processTaskResults(List<Future<Pair<String, Boolean>>> list, OMUpdateEventBatch oMUpdateEventBatch) throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (Future<Pair<String, Boolean>> future : list) {
            String str = (String) future.get().getLeft();
            if (((Boolean) future.get().getRight()).booleanValue()) {
                this.taskFailureCounter.get(str).set(0);
                storeLastCompletedTransaction(str, oMUpdateEventBatch.getLastSequenceNumber());
            } else {
                LOG.info("Failed task : {}", str);
                arrayList.add(future.get().getLeft());
            }
        }
        return arrayList;
    }
}
