package org.apache.reef.examples.suspend;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.examples.suspend.Launch;
import org.apache.reef.io.checkpoint.CheckpointID;
import org.apache.reef.io.checkpoint.CheckpointService;
import org.apache.reef.io.checkpoint.fs.FSCheckpointID;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.task.Task;
import org.apache.reef.task.TaskMessage;
import org.apache.reef.task.TaskMessageSource;
import org.apache.reef.task.events.SuspendEvent;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;

@Unit
/* loaded from: input_file:org/apache/reef/examples/suspend/SuspendTestTask.class */
public class SuspendTestTask implements Task, TaskMessageSource {
    private static final Logger LOG = Logger.getLogger(SuspendTestTask.class.getName());
    private final CheckpointService checkpointService;
    private final int numCycles;
    private final int delay;
    private final ObjectSerializableCodec<Integer> codecInt = new ObjectSerializableCodec<>();
    private final ObjectWritableCodec<CheckpointID> codecCheckpoint = new ObjectWritableCodec<>(FSCheckpointID.class);
    private int counter = 0;
    private boolean suspended = false;

    /* loaded from: input_file:org/apache/reef/examples/suspend/SuspendTestTask$SuspendHandler.class */
    public class SuspendHandler implements EventHandler<SuspendEvent> {
        public SuspendHandler() {
        }

        public void onNext(SuspendEvent suspendEvent) {
            synchronized (SuspendTestTask.this) {
                SuspendTestTask.LOG.log(Level.INFO, "Suspend: {0}; counter: {1}", new Object[]{this, Integer.valueOf(SuspendTestTask.this.counter)});
                SuspendTestTask.this.suspended = true;
                SuspendTestTask.this.notify();
            }
        }
    }

    @Inject
    public SuspendTestTask(CheckpointService checkpointService, @Parameter(Launch.NumCycles.class) int i, @Parameter(Launch.Delay.class) int i2) {
        this.checkpointService = checkpointService;
        this.numCycles = i;
        this.delay = i2 * 1000;
    }

    public synchronized byte[] call(byte[] bArr) throws IOException, InterruptedException {
        LOG.log(Level.INFO, "Start: {0} counter: {1}/{2}", new Object[]{this, Integer.valueOf(this.counter), Integer.valueOf(this.numCycles)});
        if (bArr != null && bArr.length > 0) {
            restore(bArr);
        }
        this.suspended = false;
        while (this.counter < this.numCycles && !this.suspended) {
            try {
                LOG.log(Level.INFO, "Run: {0} counter: {1}/{2} sleep: {3}", new Object[]{this, Integer.valueOf(this.counter), Integer.valueOf(this.numCycles), Integer.valueOf(this.delay)});
                wait(this.delay);
            } catch (InterruptedException e) {
                LOG.log(Level.INFO, "{0} interrupted. counter: {1}: {2}", new Object[]{this, Integer.valueOf(this.counter), e});
            }
            this.counter++;
        }
        return this.suspended ? save() : this.codecInt.encode(Integer.valueOf(this.counter));
    }

    public synchronized Optional<TaskMessage> getMessage() {
        LOG.log(Level.INFO, "Message from Task {0} to the Driver: counter: {1}", new Object[]{this, Integer.valueOf(this.counter)});
        return Optional.of(TaskMessage.from(SuspendTestTask.class.getName(), this.codecInt.encode(Integer.valueOf(this.counter))));
    }

    private synchronized byte[] save() throws IOException, InterruptedException {
        CheckpointService.CheckpointWriteChannel create = this.checkpointService.create();
        Throwable th = null;
        try {
            try {
                create.write(ByteBuffer.wrap(this.codecInt.encode(Integer.valueOf(this.counter))));
                byte[] encode = this.codecCheckpoint.encode((ObjectWritableCodec<CheckpointID>) this.checkpointService.commit(create));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return encode;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private synchronized void restore(byte[] bArr) throws IOException, InterruptedException {
        CheckpointID m66decode = this.codecCheckpoint.m66decode(bArr);
        CheckpointService.CheckpointReadChannel open = this.checkpointService.open(m66decode);
        Throwable th = null;
        try {
            try {
                ByteBuffer wrap = ByteBuffer.wrap(this.codecInt.encode(Integer.valueOf(this.counter)));
                open.read(wrap);
                this.counter = ((Integer) this.codecInt.decode(wrap.array())).intValue();
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                this.checkpointService.delete(m66decode);
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }
}
