package gobblin.runtime;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.metastore.StateStore;
import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.util.ClassAliasResolver;
import gobblin.util.io.GsonInterfaceAdapter;
import gobblin.writer.WatermarkStorage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/StateStoreBasedWatermarkStorage.class */
public class StateStoreBasedWatermarkStorage implements WatermarkStorage {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StateStoreBasedWatermarkStorage.class);
    private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
    public static final String WATERMARK_STORAGE_TYPE_KEY = "streaming.watermarkStateStore.type";
    public static final String WATERMARK_STORAGE_TYPE_DEFAULT = "zk";
    public static final String WATERMARK_STORAGE_CONFIG_PREFIX = "streaming.watermarkStateStore.config.";
    private static final String WATERMARK_STORAGE_PREFIX = "streamingWatermarks:";
    public final StateStore<CheckpointableWatermarkState> _stateStore;
    private final String _storeName;

    Config getStateStoreConfig(State state) {
        Properties properties = state.getProperties();
        for (String str : properties.stringPropertyNames()) {
            if (str.startsWith(WATERMARK_STORAGE_CONFIG_PREFIX)) {
                properties.setProperty(str.substring(WATERMARK_STORAGE_CONFIG_PREFIX.length()), (String) properties.get(str));
            }
        }
        Config parseProperties = ConfigFactory.parseProperties(properties);
        if (!parseProperties.hasPath(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY)) {
            parseProperties = parseProperties.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/streamingWatermarks"));
        }
        return parseProperties;
    }

    public StateStoreBasedWatermarkStorage(State state) {
        Preconditions.checkArgument(state != null);
        Preconditions.checkArgument(!state.getProp(ConfigurationKeys.JOB_NAME_KEY).isEmpty());
        try {
            StateStore.Factory factory = (StateStore.Factory) new ClassAliasResolver(StateStore.Factory.class).resolveClass(state.getProp(WATERMARK_STORAGE_TYPE_KEY, WATERMARK_STORAGE_TYPE_DEFAULT)).newInstance();
            Config stateStoreConfig = getStateStoreConfig(state);
            this._stateStore = factory.createStateStore(stateStoreConfig, CheckpointableWatermarkState.class);
            this._storeName = WATERMARK_STORAGE_PREFIX + state.getProp(ConfigurationKeys.JOB_NAME_KEY);
            log.info("State Store directory configured as : {}", stateStoreConfig.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY));
            log.info("Configured the StateStoreBasedWatermarkStorage with storeName: {}", this._storeName);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } catch (IllegalAccessException e2) {
            throw new RuntimeException(e2);
        } catch (InstantiationException e3) {
            throw new RuntimeException(e3);
        }
    }

    @Override // gobblin.writer.WatermarkStorage
    public void commitWatermarks(Iterable<CheckpointableWatermark> iterable) throws IOException {
        for (CheckpointableWatermark checkpointableWatermark : iterable) {
            this._stateStore.put(this._storeName, checkpointableWatermark.getSource(), new CheckpointableWatermarkState(checkpointableWatermark, GSON));
        }
    }

    @Override // gobblin.writer.WatermarkStorage
    public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class<? extends CheckpointableWatermark> cls, Iterable<String> iterable) throws IOException {
        HashMap hashMap = new HashMap();
        for (String str : iterable) {
            CheckpointableWatermarkState checkpointableWatermarkState = this._stateStore.get(this._storeName, str, str);
            if (checkpointableWatermarkState != null) {
                hashMap.put(str, (CheckpointableWatermark) GSON.fromJson(checkpointableWatermarkState.getProp(str), (Class) cls));
            }
        }
        if (hashMap.isEmpty()) {
            log.warn("Didn't find any committed watermarks");
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
        return this._stateStore.getAll(this._storeName);
    }
}
