package org.apache.samza.checkpoint.azure;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.TableBatchOperation;
import com.microsoft.azure.storage.table.TableQuery;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import org.apache.samza.AzureClient;
import org.apache.samza.AzureException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.config.AzureConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/checkpoint/azure/AzureCheckpointManager.class */
public class AzureCheckpointManager implements CheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(AzureCheckpointManager.class.getName());
    private static final String PARTITION_KEY = "PartitionKey";
    public static final String REGEX_INVALID_KEY = ".*[#?/\\\\].*";
    public static final String REGEX_TABLE_NAME = "[^A-Za-z0-9]";
    public static final int MAX_WRITE_BATCH_SIZE = 100;
    public static final String SYSTEM_PROP_NAME = "system";
    public static final String STREAM_PROP_NAME = "stream";
    public static final String PARTITION_PROP_NAME = "partition";
    private final String jobTableName;
    private final String storageConnectionString;
    private final AzureClient azureClient;
    private CloudTable cloudTable;
    private final Set<TaskName> taskNames = new HashSet();
    private final JsonSerdeV2<Map<String, String>> jsonSerde = new JsonSerdeV2<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AzureCheckpointManager(AzureConfig azureConfig, Optional<String> optional) {
        if (!optional.isPresent()) {
            throw new AzureException("Jobs must have a name to use Azure Checkpoint Manager");
        }
        this.jobTableName = optional.get().replaceAll(REGEX_TABLE_NAME, "");
        this.storageConnectionString = azureConfig.getAzureConnectionString();
        this.azureClient = new AzureClient(this.storageConnectionString);
    }

    public void start() {
        try {
            this.cloudTable = this.azureClient.getTableClient().getTableReference(this.jobTableName);
            this.cloudTable.createIfNotExists();
        } catch (StorageException e) {
            LOG.error("Azure Storage failed when creating checkpoint table", e);
            throw new AzureException((Throwable) e);
        } catch (URISyntaxException e2) {
            LOG.error("Connection string {} specifies an invalid URI while creating checkpoint table.", this.storageConnectionString);
            throw new AzureException(e2);
        }
    }

    public void register(TaskName taskName) {
        this.taskNames.add(taskName);
    }

    public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
        Preconditions.checkArgument(checkpoint instanceof CheckpointV1, "Only CheckpointV1 could be written to Azure");
        if (!this.taskNames.contains(taskName)) {
            throw new SamzaException("writing checkpoint of unregistered task");
        }
        TableBatchOperation tableBatchOperation = new TableBatchOperation();
        Iterator it = checkpoint.getOffsets().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            SystemStreamPartition systemStreamPartition = (SystemStreamPartition) entry.getKey();
            String str = (String) entry.getValue();
            String taskName2 = taskName.toString();
            checkValidKey(taskName2, "Taskname");
            String serializeSystemStreamPartition = serializeSystemStreamPartition(systemStreamPartition);
            checkValidKey(serializeSystemStreamPartition, "SystemStreamPartition");
            tableBatchOperation.insertOrReplace(new TaskCheckpointEntity(taskName2, serializeSystemStreamPartition, str));
            if (tableBatchOperation.size() >= 100 || !it.hasNext()) {
                try {
                    this.cloudTable.execute(tableBatchOperation);
                    tableBatchOperation.clear();
                } catch (StorageException e) {
                    LOG.error("Executing batch failed for task: {}", taskName);
                    throw new AzureException((Throwable) e);
                }
            }
        }
    }

    private void checkValidKey(String str, String str2) {
        if (str == null || str.matches(REGEX_INVALID_KEY)) {
            throw new AzureException(String.format("Cannot insert to Azure Checkpoint Manager; %s %s contains invalid characters [*, /, \\\\, ?]", str2, str));
        }
    }

    private String serializeSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
        HashMap hashMap = new HashMap();
        hashMap.put(SYSTEM_PROP_NAME, systemStreamPartition.getSystem());
        hashMap.put(STREAM_PROP_NAME, systemStreamPartition.getStream());
        hashMap.put(PARTITION_PROP_NAME, String.valueOf(systemStreamPartition.getPartition().getPartitionId()));
        return new String(this.jsonSerde.toBytes(hashMap));
    }

    private SystemStreamPartition deserializeSystemStreamPartition(String str) {
        Map map = (Map) this.jsonSerde.fromBytes(str.getBytes());
        return new SystemStreamPartition((String) map.get(SYSTEM_PROP_NAME), (String) map.get(STREAM_PROP_NAME), new Partition(Integer.parseInt((String) map.get(PARTITION_PROP_NAME))));
    }

    public Checkpoint readLastCheckpoint(TaskName taskName) {
        if (!this.taskNames.contains(taskName)) {
            throw new SamzaException("reading checkpoint of unregistered/unwritten task");
        }
        TableQuery where = TableQuery.from(TaskCheckpointEntity.class).where(TableQuery.generateFilterCondition(PARTITION_KEY, "eq", taskName.toString()));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        try {
            for (TaskCheckpointEntity taskCheckpointEntity : this.cloudTable.execute(where)) {
                builder.put(deserializeSystemStreamPartition(taskCheckpointEntity.getRowKey()), taskCheckpointEntity.getOffset());
            }
            LOG.debug("Received checkpoint state for taskName=%s", taskName);
            return new CheckpointV1(builder.build());
        } catch (NoSuchElementException e) {
            LOG.warn("No checkpoints found found for registered taskName={}", taskName);
            return null;
        }
    }

    public void stop() {
    }

    public void clearCheckpoints() {
        LOG.debug("Clearing all checkpoints in Azure table");
        Iterator<TaskName> it = this.taskNames.iterator();
        while (it.hasNext()) {
            deleteEntities(this.cloudTable.execute(TableQuery.from(TaskCheckpointEntity.class).where(TableQuery.generateFilterCondition(PARTITION_KEY, "eq", it.next().toString()))).iterator());
        }
    }

    private void deleteEntities(Iterator<TaskCheckpointEntity> it) {
        TableBatchOperation tableBatchOperation = new TableBatchOperation();
        while (it.hasNext()) {
            tableBatchOperation.delete(it.next());
            if (tableBatchOperation.size() >= 100 || !it.hasNext()) {
                try {
                    this.cloudTable.execute(tableBatchOperation);
                    tableBatchOperation.clear();
                } catch (StorageException e) {
                    LOG.error("Executing batch failed for deleting checkpoints");
                    throw new AzureException((Throwable) e);
                }
            }
        }
    }
}
