/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.Serializable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.state.ReadStateStore;
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker;
import org.apache.spark.sql.execution.streaming.state.StateStore;
import org.apache.spark.sql.execution.streaming.state.StateStoreConf;
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef;
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.state.StateStoreProvider;
import org.apache.spark.sql.execution.streaming.state.StateStoreProvider$;
import org.apache.spark.sql.execution.streaming.state.StateStoreProviderId;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Try$;
import scala.util.control.NonFatal$;

public final class StateStore$
implements Logging {
    public static StateStore$ MODULE$;
    private final int PARTITION_ID_TO_CHECK_SCHEMA;
    @GuardedBy(value="loadedProviders")
    private final HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders;
    @GuardedBy(value="loadedProviders")
    private final HashMap<StateStoreProviderId, Option<Throwable>> schemaValidated;
    @GuardedBy(value="loadedProviders")
    private StateStore.MaintenanceTask maintenanceTask;
    @GuardedBy(value="loadedProviders")
    private StateStoreCoordinatorRef _coordRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new StateStore$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public int PARTITION_ID_TO_CHECK_SCHEMA() {
        return this.PARTITION_ID_TO_CHECK_SCHEMA;
    }

    private HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders() {
        return this.loadedProviders;
    }

    private HashMap<StateStoreProviderId, Option<Throwable>> schemaValidated() {
        return this.schemaValidated;
    }

    private StateStore.MaintenanceTask maintenanceTask() {
        return this.maintenanceTask;
    }

    private void maintenanceTask_$eq(StateStore.MaintenanceTask x$1) {
        this.maintenanceTask = x$1;
    }

    private StateStoreCoordinatorRef _coordRef() {
        return this._coordRef;
    }

    private void _coordRef_$eq(StateStoreCoordinatorRef x$1) {
        this._coordRef = x$1;
    }

    public ReadStateStore getReadOnly(StateStoreProviderId storeProviderId, StructType keySchema, StructType valueSchema, Option<Object> indexOrdinal, long version, StateStoreConf storeConf, Configuration hadoopConf) {
        Predef$.MODULE$.require(version >= 0L);
        StateStoreProvider storeProvider = this.getStateStoreProvider(storeProviderId, keySchema, valueSchema, indexOrdinal, storeConf, hadoopConf);
        return storeProvider.getReadStore(version);
    }

    public StateStore get(StateStoreProviderId storeProviderId, StructType keySchema, StructType valueSchema, Option<Object> indexOrdinal, long version, StateStoreConf storeConf, Configuration hadoopConf) {
        Predef$.MODULE$.require(version >= 0L);
        StateStoreProvider storeProvider = this.getStateStoreProvider(storeProviderId, keySchema, valueSchema, indexOrdinal, storeConf, hadoopConf);
        return storeProvider.getStore(version);
    }

    private StateStoreProvider getStateStoreProvider(StateStoreProviderId storeProviderId, StructType keySchema, StructType valueSchema, Option<Object> indexOrdinal, StateStoreConf storeConf, Configuration hadoopConf) {
        StateStoreProvider stateStoreProvider;
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            Option result;
            this.startMaintenanceIfNeeded(storeConf);
            if (storeProviderId.storeId().partitionId() == this.PARTITION_ID_TO_CHECK_SCHEMA() && (result = (Option)this.schemaValidated().getOrElseUpdate((Object)storeProviderId, (Function0 & Serializable & scala.Serializable)() -> {
                StateSchemaCompatibilityChecker checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf);
                Option ret = (Option)Try$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> checker.check(keySchema, valueSchema)).toEither().fold((Function1 & Serializable & scala.Serializable)x$4 -> new Some(x$4), (Function1 & Serializable & scala.Serializable)x$5 -> None$.MODULE$);
                return storeConf.stateSchemaCheckEnabled() ? ret : None$.MODULE$;
            })).isDefined()) {
                throw (Throwable)result.get();
            }
            StateStoreProvider provider = (StateStoreProvider)this.loadedProviders().getOrElseUpdate((Object)storeProviderId, (Function0 & Serializable & scala.Serializable)() -> StateStoreProvider$.MODULE$.createAndInit(storeProviderId, keySchema, valueSchema, indexOrdinal, storeConf, hadoopConf));
            this.reportActiveStoreInstance(storeProviderId);
            stateStoreProvider = provider;
        }
        return stateStoreProvider;
    }

    public void unload(StateStoreProviderId storeProviderId) {
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.loadedProviders().remove((Object)storeProviderId).foreach((Function1 & Serializable & scala.Serializable)x$6 -> {
                x$6.close();
                return BoxedUnit.UNIT;
            });
        }
    }

    public void unloadAll() {
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.loadedProviders().keySet().foreach((Function1 & Serializable & scala.Serializable)key -> {
                StateStore$.MODULE$.unload(key);
                return BoxedUnit.UNIT;
            });
            this.loadedProviders().clear();
        }
    }

    public boolean isLoaded(StateStoreProviderId storeProviderId) {
        boolean bl;
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            bl = this.loadedProviders().contains((Object)storeProviderId);
        }
        return bl;
    }

    public boolean isMaintenanceRunning() {
        boolean bl;
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            bl = this.maintenanceTask() != null && this.maintenanceTask().isRunning();
        }
        return bl;
    }

    public void stop() {
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.loadedProviders().keySet().foreach((Function1 & Serializable & scala.Serializable)key -> {
                StateStore$.MODULE$.unload(key);
                return BoxedUnit.UNIT;
            });
            this.loadedProviders().clear();
            this._coordRef_$eq(null);
            if (this.maintenanceTask() != null) {
                this.maintenanceTask().stop();
                this.maintenanceTask_$eq(null);
            }
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "StateStore stopped");
        }
    }

    private void startMaintenanceIfNeeded(StateStoreConf storeConf) {
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            if (SparkEnv$.MODULE$.get() != null && !this.isMaintenanceRunning()) {
                this.maintenanceTask_$eq(new StateStore.MaintenanceTask(storeConf.maintenanceInterval(), (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> MODULE$.doMaintenance(), (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    HashMap<StateStoreProviderId, StateStoreProvider> hashMap = MODULE$.loadedProviders();
                    synchronized (hashMap) {
                        MODULE$.loadedProviders().clear();
                    }
                }));
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "State Store maintenance task started");
            }
        }
    }

    private void doMaintenance() {
        Seq seq;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Doing maintenance");
        if (SparkEnv$.MODULE$.get() == null) {
            throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores");
        }
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            seq = this.loadedProviders().toSeq();
        }
        seq.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            StateStore$.$anonfun$doMaintenance$2(x0$1);
            return BoxedUnit.UNIT;
        });
    }

    private void reportActiveStoreInstance(StateStoreProviderId storeProviderId) {
        block0: {
            if (SparkEnv$.MODULE$.get() == null) break block0;
            String host = SparkEnv$.MODULE$.get().blockManager().blockManagerId().host();
            String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
            this.coordinatorRef().foreach((Function1 & Serializable & scala.Serializable)x$7 -> {
                x$7.reportActiveInstance(storeProviderId, host, executorId);
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Reported that the loaded instance ").append(storeProviderId).append(" is active").toString());
        }
    }

    /*
     * WARNING - void declaration
     */
    private boolean verifyIfStoreInstanceActive(StateStoreProviderId storeProviderId) {
        boolean bl;
        if (SparkEnv$.MODULE$.get() != null) {
            void var3_3;
            String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
            boolean verified = BoxesRunTime.unboxToBoolean((Object)this.coordinatorRef().map((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)x$8.verifyIfInstanceActive(storeProviderId, executorId))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> false));
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("Verified whether the loaded instance ").append(storeProviderId).append(" is active: ").append(verified).toString());
            bl = var3_3;
        } else {
            bl = false;
        }
        return bl;
    }

    private Option<StateStoreCoordinatorRef> coordinatorRef() {
        None$ none$;
        HashMap<StateStoreProviderId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            None$ none$2;
            SparkEnv env = SparkEnv$.MODULE$.get();
            if (env != null) {
                boolean isDriver;
                String string = env.executorId();
                String string2 = SparkContext$.MODULE$.DRIVER_IDENTIFIER();
                boolean bl = !(string != null ? !string.equals(string2) : string2 != null) ? true : (isDriver = false);
                if (isDriver || this._coordRef() == null) {
                    this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Getting StateStoreCoordinatorRef");
                    this._coordRef_$eq(StateStoreCoordinatorRef$.MODULE$.forExecutor(env));
                }
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Retrieved reference to StateStoreCoordinator: ").append(MODULE$._coordRef()).toString());
                none$2 = new Some((Object)this._coordRef());
            } else {
                this._coordRef_$eq(null);
                none$2 = None$.MODULE$;
            }
            none$ = none$2;
        }
        return none$;
    }

    public static final /* synthetic */ void $anonfun$doMaintenance$2(Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            StateStoreProviderId id = (StateStoreProviderId)tuple2._1();
            StateStoreProvider provider = (StateStoreProvider)tuple2._2();
            try {
                if (MODULE$.verifyIfStoreInstanceActive(id)) {
                    provider.doMaintenance();
                    boxedUnit = BoxedUnit.UNIT;
                }
                MODULE$.unload(id);
                MODULE$.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(9).append("Unloaded ").append(provider).toString());
                boxedUnit = BoxedUnit.UNIT;
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable e = (Throwable)option.get();
                    MODULE$.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Error managing ").append(provider).append(", stopping management thread").toString());
                    throw e;
                }
                throw throwable;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    private StateStore$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.PARTITION_ID_TO_CHECK_SCHEMA = 0;
        this.loadedProviders = new HashMap();
        this.schemaValidated = new HashMap();
        this.maintenanceTask = null;
        this._coordRef = null;
    }
}

