package fluflu.queue;

import fluflu.Letter;
import fluflu.queue.Client;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import monix.eval.Callback;
import monix.eval.Task;
import monix.eval.Task$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.List;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/* compiled from: Client.scala */
/* loaded from: input_file:fluflu/queue/Client$ClientImpl$Consumer$.class */
public class Client$ClientImpl$Consumer$ implements Runnable {
    private final ScheduledExecutorService scheduler;
    private final AtomicBoolean running;
    private final Function1<Function0<Either<Throwable, byte[]>>, Task<BoxedUnit>> write;
    private final /* synthetic */ Client.ClientImpl $outer;

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.scheduler.schedule(this, this.$outer.fluflu$queue$Client$ClientImpl$$delay.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    private synchronized void consume() {
        if (this.$outer.logger().underlying().isTraceEnabled()) {
            this.$outer.logger().underlying().trace("Start emitting.");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        long nanoTime = System.nanoTime();
        Task$.MODULE$.gatherUnordered(package$.MODULE$.Iterator().continually(() -> {
            return this.$outer.fluflu$queue$Client$ClientImpl$$msgQueue.poll();
        }).takeWhile(function0 -> {
            return BoxesRunTime.boxToBoolean($anonfun$consume$2(function0));
        }).map(this.write).take(Client$.MODULE$.ChunkSize())).runAsync(new Callback<List<BoxedUnit>>(this) { // from class: fluflu.queue.Client$ClientImpl$Consumer$$anon$1
            private final /* synthetic */ Client$ClientImpl$Consumer$ $outer;

            public void onError(Throwable th) {
                if (!this.$outer.fluflu$queue$Client$ClientImpl$Consumer$$$outer().logger().underlying().isErrorEnabled()) {
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    this.$outer.fluflu$queue$Client$ClientImpl$Consumer$$$outer().logger().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"A exception occurs while: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{th.getMessage()})), th);
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
            }

            public void onSuccess(List<BoxedUnit> list) {
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, this.$outer.fluflu$queue$Client$ClientImpl$$taskScheduler);
        if (!this.$outer.logger().underlying().isTraceEnabled()) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            this.$outer.logger().underlying().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"A emitting spend ", " ms."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))})));
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.$outer.fluflu$queue$Client$ClientImpl$$msgQueue.isEmpty()) {
            this.running.set(false);
            return;
        }
        consume();
        this.running.set(false);
        if (this.scheduler.isShutdown()) {
            return;
        }
        start();
    }

    public void close() {
        fluflu.package$.MODULE$.awaitTermination(this.scheduler, this.$outer.fluflu$queue$Client$ClientImpl$$terminationDelay);
        consume();
        this.$outer.fluflu$queue$Client$ClientImpl$$messenger.close();
    }

    public /* synthetic */ Client.ClientImpl fluflu$queue$Client$ClientImpl$Consumer$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ boolean $anonfun$consume$2(Function0 function0) {
        return function0 != null;
    }

    public Client$ClientImpl$Consumer$(Client.ClientImpl clientImpl) {
        if (clientImpl == null) {
            throw null;
        }
        this.$outer = clientImpl;
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.running = new AtomicBoolean(false);
        this.write = function0 -> {
            Task write;
            Left left = (Either) function0.apply();
            if (left instanceof Left) {
                write = Task$.MODULE$.raiseError((Throwable) left.value());
            } else {
                if (!(left instanceof Right)) {
                    throw new MatchError(left);
                }
                write = this.$outer.fluflu$queue$Client$ClientImpl$$messenger.write(((Letter) ((Right) left).value()).message());
            }
            return write;
        };
    }
}
