package org.apache.reef.examples.data.loading;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.task.CompletedTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.io.data.loading.api.DataLoadingService;
import org.apache.reef.poison.PoisonedConfiguration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.wake.EventHandler;

@DriverSide
@Unit
/* loaded from: input_file:org/apache/reef/examples/data/loading/LineCounter.class */
public class LineCounter {
    private static final Logger LOG = Logger.getLogger(LineCounter.class.getName());
    private final AtomicInteger ctrlCtxIds = new AtomicInteger();
    private final AtomicInteger lineCnt = new AtomicInteger();
    private final AtomicInteger completedDataTasks = new AtomicInteger();
    private final DataLoadingService dataLoadingService;

    /* loaded from: input_file:org/apache/reef/examples/data/loading/LineCounter$ContextActiveHandler.class */
    public class ContextActiveHandler implements EventHandler<ActiveContext> {
        public ContextActiveHandler() {
        }

        @Override // org.apache.reef.wake.EventHandler
        public void onNext(ActiveContext activeContext) {
            String id = activeContext.getId();
            LineCounter.LOG.log(Level.FINER, "Context active: {0}", id);
            if (LineCounter.this.dataLoadingService.isDataLoadedContext(activeContext)) {
                String str = "LineCountCtxt-" + LineCounter.this.ctrlCtxIds.getAndIncrement();
                LineCounter.LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}", new Object[]{str, id});
                activeContext.submitContext(Tang.Factory.getTang().newConfigurationBuilder(PoisonedConfiguration.CONTEXT_CONF.set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4").set(PoisonedConfiguration.CRASH_TIMEOUT, "1").build(), ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, str).build()).build());
                return;
            }
            if (!activeContext.getId().startsWith("LineCountCtxt")) {
                LineCounter.LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", id);
                activeContext.close();
                return;
            }
            String str2 = "LineCountTask-" + LineCounter.this.ctrlCtxIds.getAndIncrement();
            LineCounter.LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{str2, id});
            try {
                activeContext.submitTask(TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, str2).set(TaskConfiguration.TASK, LineCountingTask.class).build());
            } catch (BindException e) {
                LineCounter.LOG.log(Level.SEVERE, "Configuration error in " + id, (Throwable) e);
                throw new RuntimeException("Configuration error in " + id, e);
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/examples/data/loading/LineCounter$TaskCompletedHandler.class */
    public class TaskCompletedHandler implements EventHandler<CompletedTask> {
        public TaskCompletedHandler() {
        }

        @Override // org.apache.reef.wake.EventHandler
        public void onNext(CompletedTask completedTask) {
            String id = completedTask.getId();
            LineCounter.LOG.log(Level.FINEST, "Completed Task: {0}", id);
            byte[] bArr = completedTask.get();
            String str = bArr == null ? "No RetVal" : new String(bArr, StandardCharsets.UTF_8);
            LineCounter.LOG.log(Level.FINE, "Line count from {0} : {1}", (Object[]) new String[]{id, str});
            LineCounter.this.lineCnt.addAndGet(Integer.parseInt(str));
            if (LineCounter.this.completedDataTasks.decrementAndGet() <= 0) {
                LineCounter.LOG.log(Level.INFO, "Total line count: {0}", Integer.valueOf(LineCounter.this.lineCnt.get()));
            }
            LineCounter.LOG.log(Level.FINEST, "Releasing Context: {0}", id);
            completedTask.getActiveContext().close();
        }
    }

    @Inject
    public LineCounter(DataLoadingService dataLoadingService) {
        this.dataLoadingService = dataLoadingService;
        this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions());
    }
}
