package org.apache.samza.table.descriptors;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.utils.SerdeUtils;

/* loaded from: input_file:org/apache/samza/table/descriptors/LocalTableDescriptor.class */
public abstract class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>> extends BaseTableDescriptor<K, V, D> {
    public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
    protected final KVSerde<K, V> serde;
    protected boolean enableChangelog;
    protected String changelogStream;
    protected Integer changelogReplicationFactor;
    protected List<String> sideInputs;
    protected SideInputsProcessor sideInputsProcessor;

    public LocalTableDescriptor(String str, KVSerde<K, V> kVSerde) {
        super(str);
        this.serde = kVSerde;
    }

    public D withSideInputs(List<String> list) {
        this.sideInputs = list;
        this.enableChangelog = false;
        this.changelogStream = null;
        this.changelogReplicationFactor = null;
        return this;
    }

    public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
        this.sideInputsProcessor = sideInputsProcessor;
        return this;
    }

    public D withChangelogEnabled() {
        this.enableChangelog = true;
        return this;
    }

    public D withChangelogStream(String str) {
        this.enableChangelog = true;
        this.changelogStream = str;
        return this;
    }

    public D withChangelogReplicationFactor(int i) {
        this.enableChangelog = true;
        this.changelogReplicationFactor = Integer.valueOf(i);
        return this;
    }

    @Override // org.apache.samza.table.descriptors.BaseTableDescriptor, org.apache.samza.table.descriptors.TableDescriptor
    public Map<String, String> toConfig(Config config) {
        HashMap hashMap = new HashMap(super.toConfig(config));
        if (this.sideInputs != null && !this.sideInputs.isEmpty()) {
            this.sideInputs.forEach(str -> {
                Preconditions.checkState(isValidSystemStreamName(str), String.format("Side input stream %s doesn't confirm to pattern %s", str, SYSTEM_STREAM_NAME_PATTERN));
            });
            addStoreConfig("side.inputs", String.join(",", this.sideInputs), hashMap);
            addStoreConfig("side.inputs.processor.serialized.instance", SerdeUtils.serialize("Side Inputs Processor", this.sideInputsProcessor), hashMap);
        }
        if (this.enableChangelog) {
            if (StringUtils.isEmpty(this.changelogStream)) {
                String str2 = config.get("job.name");
                Preconditions.checkNotNull(str2, "job.name not found in job config");
                String str3 = config.get("job.id");
                Preconditions.checkNotNull(str3, "job.id not found in job config");
                this.changelogStream = String.format("%s-%s-table-%s", str2, str3, this.tableId);
            }
            Preconditions.checkState(isValidSystemStreamName(this.changelogStream), String.format("Changelog stream %s doesn't confirm to pattern %s", this.changelogStream, SYSTEM_STREAM_NAME_PATTERN));
            addStoreConfig("changelog", this.changelogStream, hashMap);
            if (this.changelogReplicationFactor != null) {
                addStoreConfig("changelog.replication.factor", this.changelogReplicationFactor.toString(), hashMap);
            }
        }
        return Collections.unmodifiableMap(hashMap);
    }

    public List<String> getSideInputs() {
        return this.sideInputs;
    }

    public KVSerde<K, V> getSerde() {
        return this.serde;
    }

    @Override // org.apache.samza.table.descriptors.BaseTableDescriptor
    protected void validate() {
        if (this.sideInputs != null || this.sideInputsProcessor != null) {
            Preconditions.checkArgument((this.sideInputs == null || this.sideInputs.isEmpty() || this.sideInputsProcessor == null) ? false : true, String.format("Invalid side input configuration for table: %s. Both side inputs and the processor must be provided", this.tableId));
        }
        if (this.enableChangelog) {
            return;
        }
        Preconditions.checkState(this.changelogStream == null, String.format("Invalid changelog configuration for table: %s. Changelog must be enabled, when changelog stream name is provided", this.tableId));
        Preconditions.checkState(this.changelogReplicationFactor == null, String.format("Invalid changelog configuration for table: %s. Changelog must be enabled, when changelog replication factor is provided", this.tableId));
    }

    protected void addStoreConfig(String str, String str2, Map<String, String> map) {
        map.put(String.format("stores.%s.%s", this.tableId, str), str2);
    }

    private boolean isValidSystemStreamName(String str) {
        return StringUtils.isNotBlank(str) && SYSTEM_STREAM_NAME_PATTERN.matcher(str).matches();
    }
}
