package net.nmoncho.helenus.flink.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import java.util.List;
import java.util.concurrent.Callable;
import net.nmoncho.helenus.flink.source.CassandraSource;
import net.nmoncho.helenus.flink.source.CassandraSplit;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: CassandraEnumeratorState.scala */
/* loaded from: input_file:net/nmoncho/helenus/flink/source/CassandraEnumeratorState$$anon$2.class */
public final class CassandraEnumeratorState$$anon$2 implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> {
    private CassandraEnumeratorState state;
    private final CqlSession session;
    private final SplitEnumeratorContext enumeratorContext$1;
    private final Function1 bstmt$1;
    private final long maxSplitMemorySize$1;

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        super.handleSourceEvent(i, sourceEvent);
    }

    private CassandraEnumeratorState state() {
        return this.state;
    }

    private void state_$eq(CassandraEnumeratorState cassandraEnumeratorState) {
        this.state = cassandraEnumeratorState;
    }

    private CqlSession session() {
        return this.session;
    }

    public void start() {
        this.enumeratorContext$1.callAsync(new Callable<CassandraEnumeratorState>(this) { // from class: net.nmoncho.helenus.flink.source.CassandraEnumeratorState$$anon$2$$anon$3
            private final /* synthetic */ CassandraEnumeratorState$$anon$2 $outer;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public CassandraEnumeratorState call() {
                return this.$outer.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$anon$$prepareSplits();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, (cassandraEnumeratorState, th) -> {
            if (th != null) {
                CassandraEnumeratorState$.MODULE$.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$log().error("Failed to prepare splits", th);
            } else {
                CassandraEnumeratorState$.MODULE$.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$log().debug("Initialized CassandraEnumeratorState: {}", new Object[]{cassandraEnumeratorState});
                this.state_$eq(cassandraEnumeratorState);
            }
        });
    }

    public CassandraEnumeratorState net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$anon$$prepareSplits() {
        int currentParallelism = this.enumeratorContext$1.currentParallelism();
        CassandraSplit.CassandraPartitioner apply = CassandraSplit$CassandraPartitioner$.MODULE$.apply(session());
        Tuple2<String, String> extractKeyspaceTable = package$.MODULE$.extractKeyspaceTable(session(), ((BoundStatement) this.bstmt$1.apply(session())).getPreparedStatement().getQuery());
        if (extractKeyspaceTable == null) {
            throw new MatchError(extractKeyspaceTable);
        }
        Tuple2 tuple2 = new Tuple2((String) extractKeyspaceTable._1(), (String) extractKeyspaceTable._2());
        return new CassandraSplit.Generator(apply, session(), (String) tuple2._1(), (String) tuple2._2(), currentParallelism, this.maxSplitMemorySize$1).prepareSplit();
    }

    public void handleSplitRequest(int i, String str) {
        checkReaderRegistered(i);
        Tuple2<Option<CassandraSplit>, CassandraEnumeratorState> nextSplit = state().nextSplit();
        if (nextSplit == null) {
            throw new MatchError(nextSplit);
        }
        Tuple2 tuple2 = new Tuple2((Option) nextSplit._1(), (CassandraEnumeratorState) nextSplit._2());
        Some some = (Option) tuple2._1();
        CassandraEnumeratorState cassandraEnumeratorState = (CassandraEnumeratorState) tuple2._2();
        if (some instanceof Some) {
            CassandraSplit cassandraSplit = (CassandraSplit) some.value();
            CassandraEnumeratorState$.MODULE$.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$log().info("Assigning splits to reader {}", BoxesRunTime.boxToInteger(i));
            this.enumeratorContext$1.assignSplit(cassandraSplit, i);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            CassandraEnumeratorState$.MODULE$.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$log().info("No split assigned to reader {} because the enumerator has no unassigned split left. Sending NoMoreSplitsEvent to reader", BoxesRunTime.boxToInteger(i));
            this.enumeratorContext$1.signalNoMoreSplits(i);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        state_$eq(cassandraEnumeratorState);
    }

    private void checkReaderRegistered(int i) {
        if (!this.enumeratorContext$1.registeredReaders().containsKey(BoxesRunTime.boxToInteger(i))) {
            throw new IllegalStateException(new StringBuilder(47).append("Reader ").append(i).append(" is not registered to source coordinator").toString());
        }
    }

    public void addSplitsBack(List<CassandraSplit> list, int i) {
        state().addSplitsBack(list);
    }

    public void addReader(int i) {
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public CassandraEnumeratorState m12snapshotState(long j) {
        return state();
    }

    public void close() {
        try {
            session().close();
        } catch (Throwable th) {
            CassandraEnumeratorState$.MODULE$.net$nmoncho$helenus$flink$source$CassandraEnumeratorState$$log().error("Error while closing session.", th);
        }
    }

    public CassandraEnumeratorState$$anon$2(Option option, CassandraSource.Config config, SplitEnumeratorContext splitEnumeratorContext, Function1 function1, long j) {
        this.enumeratorContext$1 = splitEnumeratorContext;
        this.bstmt$1 = function1;
        this.maxSplitMemorySize$1 = j;
        this.state = (CassandraEnumeratorState) option.getOrElse(() -> {
            return CassandraEnumeratorState$.MODULE$.empty();
        });
        this.session = config.session();
    }
}
