package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/kinesis/shaded/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.class */
public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
    private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10;
    private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
    private final ILeaseManager<KinesisClientLease> leaseManager;
    private long initialLeaseTableReadCapacity;
    private long initialLeaseTableWriteCapacity;
    private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class);
    private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector();

    public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> iLeaseManager, String str, long j, long j2, LeaseSelector<KinesisClientLease> leaseSelector) {
        super(iLeaseManager, leaseSelector, str, j, j2);
        this.initialLeaseTableReadCapacity = 10L;
        this.initialLeaseTableWriteCapacity = 10L;
        this.leaseManager = iLeaseManager;
    }

    public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> iLeaseManager, String str, long j, long j2) {
        this(iLeaseManager, str, j, j2, DEFAULT_LEASE_SELECTOR);
    }

    public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> iLeaseManager, LeaseSelector<KinesisClientLease> leaseSelector, String str, long j, long j2, IMetricsFactory iMetricsFactory) {
        super(iLeaseManager, leaseSelector, str, j, j2, iMetricsFactory);
        this.initialLeaseTableReadCapacity = 10L;
        this.initialLeaseTableWriteCapacity = 10L;
        this.leaseManager = iLeaseManager;
    }

    public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> iLeaseManager, LeaseSelector<KinesisClientLease> leaseSelector, String str, long j, long j2, int i, int i2, int i3, IMetricsFactory iMetricsFactory) {
        super(iLeaseManager, leaseSelector, str, j, j2, i, i2, i3, iMetricsFactory);
        this.initialLeaseTableReadCapacity = 10L;
        this.initialLeaseTableWriteCapacity = 10L;
        this.leaseManager = iLeaseManager;
    }

    public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> iLeaseManager, ILeaseTaker<KinesisClientLease> iLeaseTaker, ILeaseRenewer<KinesisClientLease> iLeaseRenewer, long j, long j2, int i, int i2, IMetricsFactory iMetricsFactory) {
        super(iLeaseTaker, iLeaseRenewer, j, j2, i, i2, iMetricsFactory);
        this.initialLeaseTableReadCapacity = 10L;
        this.initialLeaseTableWriteCapacity = 10L;
        this.leaseManager = iLeaseManager;
    }

    public KinesisClientLibLeaseCoordinator withInitialLeaseTableReadCapacity(long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("readCapacity should be >= 1");
        }
        this.initialLeaseTableReadCapacity = j;
        return this;
    }

    public KinesisClientLibLeaseCoordinator withInitialLeaseTableWriteCapacity(long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("writeCapacity should be >= 1");
        }
        this.initialLeaseTableWriteCapacity = j;
        return this;
    }

    boolean setCheckpoint(String str, ExtendedSequenceNumber extendedSequenceNumber, UUID uuid) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        KinesisClientLease currentlyHeldLease = getCurrentlyHeldLease(str);
        if (currentlyHeldLease == null) {
            LOG.info(String.format("Worker %s could not update checkpoint for shard %s because it does not hold the lease", getWorkerIdentifier(), str));
            return false;
        }
        currentlyHeldLease.setCheckpoint(extendedSequenceNumber);
        currentlyHeldLease.setPendingCheckpoint(null);
        currentlyHeldLease.setOwnerSwitchesSinceCheckpoint(0L);
        return updateLease(currentlyHeldLease, uuid);
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint
    public void setCheckpoint(String str, ExtendedSequenceNumber extendedSequenceNumber, String str2) throws KinesisClientLibException {
        try {
            if (setCheckpoint(str, extendedSequenceNumber, UUID.fromString(str2))) {
            } else {
                throw new ShutdownException("Can't update checkpoint - instance doesn't hold the lease for this shard");
            }
        } catch (DependencyException e) {
            throw new KinesisClientLibDependencyException("Unable to save checkpoint for shardId " + str, e);
        } catch (InvalidStateException e2) {
            String str3 = "Unable to save checkpoint for shardId " + str;
            LOG.error(str3, e2);
            throw new org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(str3, e2);
        } catch (ProvisionedThroughputException e3) {
            throw new ThrottlingException("Got throttled while updating checkpoint.", e3);
        }
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint
    public ExtendedSequenceNumber getCheckpoint(String str) throws KinesisClientLibException {
        try {
            return this.leaseManager.getLease(str).getCheckpoint();
        } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
            String str2 = "Unable to fetch checkpoint for shardId " + str;
            LOG.error(str2, e);
            throw new KinesisClientLibIOException(str2, e);
        }
    }

    boolean prepareCheckpoint(String str, ExtendedSequenceNumber extendedSequenceNumber, UUID uuid) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        KinesisClientLease currentlyHeldLease = getCurrentlyHeldLease(str);
        if (currentlyHeldLease == null) {
            LOG.info(String.format("Worker %s could not prepare checkpoint for shard %s because it does not hold the lease", getWorkerIdentifier(), str));
            return false;
        }
        currentlyHeldLease.setPendingCheckpoint((ExtendedSequenceNumber) Objects.requireNonNull(extendedSequenceNumber, "pendingCheckpoint should not be null"));
        return updateLease(currentlyHeldLease, uuid);
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint
    public void prepareCheckpoint(String str, ExtendedSequenceNumber extendedSequenceNumber, String str2) throws KinesisClientLibException {
        try {
            if (prepareCheckpoint(str, extendedSequenceNumber, UUID.fromString(str2))) {
            } else {
                throw new ShutdownException("Can't prepare checkpoint - instance doesn't hold the lease for this shard");
            }
        } catch (DependencyException e) {
            throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + str, e);
        } catch (InvalidStateException e2) {
            String str3 = "Unable to prepare checkpoint for shardId " + str;
            LOG.error(str3, e2);
            throw new org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(str3, e2);
        } catch (ProvisionedThroughputException e3) {
            throw new ThrottlingException("Got throttled while preparing checkpoint.", e3);
        }
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint
    public Checkpoint getCheckpointObject(String str) throws KinesisClientLibException {
        String str2 = "Unable to fetch checkpoint for shardId " + str;
        try {
            KinesisClientLease lease = this.leaseManager.getLease(str);
            if (lease == null) {
                throw new org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(str2);
            }
            return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
        } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
            LOG.error(str2, e);
            throw new KinesisClientLibIOException(str2, e);
        }
    }

    public List<ShardInfo> getCurrentAssignments() {
        return convertLeasesToAssignments(getAssignments());
    }

    public static List<ShardInfo> convertLeasesToAssignments(Collection<KinesisClientLease> collection) {
        if (collection == null || collection.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<KinesisClientLease> it2 = collection.iterator();
        while (it2.hasNext()) {
            arrayList.add(convertLeaseToAssignment(it2.next()));
        }
        return arrayList;
    }

    public static ShardInfo convertLeaseToAssignment(KinesisClientLease kinesisClientLease) {
        return new ShardInfo(kinesisClientLease.getLeaseKey(), kinesisClientLease.getConcurrencyToken().toString(), kinesisClientLease.getParentShardIds(), kinesisClientLease.getCheckpoint());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
        if (this.leaseManager.createLeaseTableIfNotExists(Long.valueOf(this.initialLeaseTableReadCapacity), Long.valueOf(this.initialLeaseTableWriteCapacity))) {
            LOG.info(String.format("Created new lease table for coordinator with initial read capacity of %d and write capacity of %d.", Long.valueOf(this.initialLeaseTableReadCapacity), Long.valueOf(this.initialLeaseTableWriteCapacity)));
        }
        if (!this.leaseManager.waitUntilLeaseTableExists(10L, 600L)) {
            throw new DependencyException(new IllegalStateException("Creating table timeout"));
        }
    }

    void runLeaseTaker() throws DependencyException, InvalidStateException {
        super.runTaker();
    }

    void runLeaseRenewer() throws DependencyException, InvalidStateException {
        super.runRenewer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ILeaseManager<KinesisClientLease> getLeaseManager() {
        return this.leaseManager;
    }
}
