/*
 * Decompiled with CFR 0.152.
 */
package com.github.euler.common;

import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.javadsl.ReceiveBuilder;
import akka.japi.function.Function;
import com.github.euler.common.Batch;
import com.github.euler.common.BatchListener;
import com.github.euler.common.BatchState;
import com.github.euler.core.Flush;
import com.github.euler.core.JobTaskFailed;
import com.github.euler.core.JobTaskFinished;
import com.github.euler.core.JobTaskToProcess;
import com.github.euler.core.ProcessingContext;
import com.github.euler.core.TaskCommand;
import java.io.Serializable;
import java.net.URI;

public class BatchExecution
extends AbstractBehavior<TaskCommand>
implements BatchListener {
    private final Batch batch;
    private final BatchState state;

    public static Behavior<TaskCommand> create(Batch batch) {
        return Behaviors.setup((Function & Serializable)context -> new BatchExecution((ActorContext<TaskCommand>)context, batch));
    }

    private BatchExecution(ActorContext<TaskCommand> context, Batch batch) {
        super(context);
        this.batch = batch;
        this.state = new BatchState();
    }

    public Receive<TaskCommand> createReceive() {
        ReceiveBuilder builder = this.newReceiveBuilder();
        builder.onMessage(JobTaskToProcess.class, this::onJobTaskToProcess);
        builder.onMessage(Flush.class, this::onFlush);
        builder.onSignal(PostStop.class, this::onPostStop);
        return builder.build();
    }

    private Behavior<TaskCommand> onJobTaskToProcess(JobTaskToProcess msg) {
        this.state.onMessage(msg);
        this.batch.process(msg, this);
        return this;
    }

    private Behavior<TaskCommand> onFlush(Flush msg) {
        this.batch.flush(msg, this);
        return this;
    }

    private Behavior<TaskCommand> onPostStop(PostStop signal) {
        this.batch.finish();
        return this;
    }

    @Override
    public void finished(URI itemURI, ProcessingContext ctx) {
        JobTaskToProcess msg = this.state.finished(itemURI);
        msg.replyTo.tell((Object)new JobTaskFinished(msg, ctx));
    }

    @Override
    public void failed(URI itemURI, ProcessingContext ctx) {
        JobTaskToProcess msg = this.state.finished(itemURI);
        msg.replyTo.tell((Object)new JobTaskFailed(msg, ctx));
    }
}

