package net.nmoncho.helenus.flink.sink;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import net.nmoncho.helenus.api.cql.ScalaPreparedStatement;
import net.nmoncho.helenus.flink.sink.CassandraSink;
import org.apache.flink.api.common.io.OutputFormatBase;
import org.apache.flink.api.common.io.SinkUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Function1;
import scala.runtime.BoxedUnit;

/* compiled from: package.scala */
/* loaded from: input_file:net/nmoncho/helenus/flink/sink/package$.class */
public final class package$ {
    public static final package$ MODULE$ = new package$();

    public <T, Out> SinkFunction<T> asSinkFunction(final Function1<CqlSession, ScalaPreparedStatement<T, Out>> function1, final CassandraSink.Config config) {
        return new RichSinkFunction<T>(config, function1) { // from class: net.nmoncho.helenus.flink.sink.package$$anon$1
            private final Semaphore semaphore;
            private CqlSession session;
            private ScalaPreparedStatement<T, Out> pstmt;
            private final CassandraSink.Config config$1;
            private final Function1 pstmtBuilder$1;

            private Semaphore semaphore() {
                return this.semaphore;
            }

            private CqlSession session() {
                return this.session;
            }

            private void session_$eq(CqlSession cqlSession) {
                this.session = cqlSession;
            }

            private ScalaPreparedStatement<T, Out> pstmt() {
                return this.pstmt;
            }

            private void pstmt_$eq(ScalaPreparedStatement<T, Out> scalaPreparedStatement) {
                this.pstmt = scalaPreparedStatement;
            }

            public void invoke(T t) {
                tryAcquire(1);
                session().executeAsync((Statement) pstmt().tupled().apply(t)).whenComplete((asyncResultSet, th) -> {
                    if (th != null) {
                        CassandraSink$.MODULE$.log().error("Error while sending value.", th);
                        this.config$1.failureHandler().apply(th);
                    } else {
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    }
                    this.semaphore().release();
                });
            }

            public void open(Configuration configuration) {
                super.open(configuration);
                session_$eq(this.config$1.session());
                pstmt_$eq((ScalaPreparedStatement) this.pstmtBuilder$1.apply(session()));
            }

            public void close() {
                flush();
                session().close();
            }

            private void tryAcquire(int i) {
                SinkUtils.tryAcquire(i, this.config$1.maxConcurrentRequests(), this.config$1.maxConcurrentRequestsTimeout(), semaphore());
            }

            private void flush() {
                tryAcquire(this.config$1.maxConcurrentRequests());
                semaphore().release(this.config$1.maxConcurrentRequests());
            }

            {
                this.config$1 = config;
                this.pstmtBuilder$1 = function1;
                this.semaphore = new Semaphore(config.maxConcurrentRequests());
            }
        };
    }

    public <T, Out> OutputFormatBase<T, BoxedUnit> asOutputFormat(final Function1<CqlSession, ScalaPreparedStatement<T, Out>> function1, final CassandraSink.Config config) {
        return new OutputFormatBase<T, BoxedUnit>(config, function1) { // from class: net.nmoncho.helenus.flink.sink.package$$anon$2
            private CqlSession session;
            private ScalaPreparedStatement<T, Out> pstmt;
            private final CassandraSink.Config config$2;
            private final Function1 pstmtBuilder$2;

            private CqlSession session() {
                return this.session;
            }

            private void session_$eq(CqlSession cqlSession) {
                this.session = cqlSession;
            }

            private ScalaPreparedStatement<T, Out> pstmt() {
                return this.pstmt;
            }

            private void pstmt_$eq(ScalaPreparedStatement<T, Out> scalaPreparedStatement) {
                this.pstmt = scalaPreparedStatement;
            }

            public CompletionStage<BoxedUnit> send(T t) {
                return session().executeAsync((Statement) pstmt().tupled().apply(t)).thenApply(asyncResultSet -> {
                    $anonfun$send$1(asyncResultSet);
                    return BoxedUnit.UNIT;
                }).whenComplete((boxedUnit, th) -> {
                    if (th != null) {
                        CassandraSink$.MODULE$.log().error("Error while sending value.", th);
                        this.config$2.failureHandler().apply(th);
                    }
                });
            }

            public void configure(Configuration configuration) {
            }

            public void postOpen() {
                super.postOpen();
                session_$eq(this.config$2.session());
                pstmt_$eq((ScalaPreparedStatement) this.pstmtBuilder$2.apply(session()));
            }

            public void postClose() {
                super.postClose();
                session().close();
            }

            public static final /* synthetic */ void $anonfun$send$1(AsyncResultSet asyncResultSet) {
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(config.maxConcurrentRequests(), config.maxConcurrentRequestsTimeout());
                this.config$2 = config;
                this.pstmtBuilder$2 = function1;
            }
        };
    }

    private package$() {
    }
}
