package org.apache.geode.management.internal.cli.functions;

import java.io.IOException;
import java.io.Serializable;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.AllConnectionsInUseException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.DefaultEntryEventFactory;
import org.apache.geode.internal.cache.DestroyedEntry;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.NonTXEntry;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.internal.functions.CliFunctionResult;
import org.apache.geode.management.internal.i18n.CliStrings;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.class */
public class WanCopyRegionFunctionDelegate implements Serializable {
    private static final int MAX_BATCH_SEND_RETRIES = 1;
    private static final int WAIT_BEFORE_COPY_MS = 500;
    private int batchId;
    private final Clock clock;
    private final ThreadSleeper threadSleeper;
    private final EventCreator eventCreator;
    private long functionStartTimestamp;
    private final int waitBeforeCopyMs;
    private static final Logger logger = LogService.getLogger();
    public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL = "No connection pool available to receiver";
    public static final String WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE = "Command not supported at remote site.";
    public static final String WAN_COPY_REGION__MSG__NO__CONNECTION = "No connection available to receiver after having copied {0} entries";
    public static final String WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED = "Error ({0}) in operation after having copied {1} entries";
    public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries copied: {0}";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ConnectionState.class */
    public static class ConnectionState {
        private volatile Connection connection = null;
        private volatile PoolImpl senderPool = null;

        ConnectionState() {
        }

        public Connection getConnection() {
            return this.connection;
        }

        public PoolImpl getSenderPool() {
            return this.senderPool;
        }

        public Optional<CliFunctionResult> connectIfNeeded(String str, GatewaySender gatewaySender) {
            if (this.senderPool == null) {
                this.senderPool = ((AbstractGatewaySender) gatewaySender).getProxy();
                if (this.senderPool == null) {
                    return Optional.of(new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
                }
                this.connection = this.senderPool.acquireConnection();
                if (this.connection.getWanSiteVersion() < KnownVersion.GEODE_1_15_0.ordinal()) {
                    return Optional.of(new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
                }
            }
            return Optional.empty();
        }

        public Optional<CliFunctionResult> reconnect(String str, int i, int i2, Exception exc) {
            close();
            if (i >= WanCopyRegionFunctionDelegate.MAX_BATCH_SEND_RETRIES) {
                return Optional.of(new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, CliStrings.format(WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, new Object[]{"Connection error", Integer.valueOf(i2)})));
            }
            WanCopyRegionFunctionDelegate.logger.error("Exception {} in sendBatch. Retrying", exc.getClass().getName());
            try {
                this.connection = this.senderPool.acquireConnection();
                return Optional.empty();
            } catch (NoAvailableServersException | AllConnectionsInUseException e) {
                return Optional.of(new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, CliStrings.format(WanCopyRegionFunctionDelegate.WAN_COPY_REGION__MSG__NO__CONNECTION, Integer.valueOf(i2))));
            }
        }

        public void close() {
            if (this.senderPool != null && this.connection != null) {
                try {
                    this.connection.close(false);
                } catch (Exception e) {
                    WanCopyRegionFunctionDelegate.logger.error("Error closing the connection used to wan-copy region entries");
                }
                this.senderPool.returnConnection(this.connection);
            }
            this.connection = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreator.class */
    public interface EventCreator extends Serializable {
        GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache internalCache, InternalRegion internalRegion, GatewaySender gatewaySender, Region.Entry<?, ?> entry, long j);
    }

    /* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreatorImpl.class */
    static class EventCreatorImpl implements EventCreator {
        EventCreatorImpl() {
        }

        @Override // org.apache.geode.management.internal.cli.functions.WanCopyRegionFunctionDelegate.EventCreator
        @VisibleForTesting
        public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache internalCache, InternalRegion internalRegion, GatewaySender gatewaySender, Region.Entry<?, ?> entry, long j) {
            EntryEventImpl createEventForPartitionedRegion = internalRegion instanceof PartitionedRegion ? createEventForPartitionedRegion(gatewaySender, internalCache, internalRegion, entry, j) : createEventForReplicatedRegion(internalCache, internalRegion, entry, j);
            if (createEventForPartitionedRegion == null) {
                return null;
            }
            try {
                return new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS, createEventForPartitionedRegion, (Object) null, GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE);
            } catch (IOException e) {
                WanCopyRegionFunctionDelegate.logger.error("Error when creating event in wan-copy: {}", e.getMessage());
                return null;
            }
        }

        private EntryEventImpl createEventForReplicatedRegion(InternalCache internalCache, InternalRegion internalRegion, Region.Entry<?, ?> entry, long j) {
            return createEvent(internalCache, internalRegion, entry, j);
        }

        private EntryEventImpl createEventForPartitionedRegion(GatewaySender gatewaySender, InternalCache internalCache, InternalRegion internalRegion, Region.Entry<?, ?> entry, long j) {
            EntryEventImpl createEvent = createEvent(internalCache, internalRegion, entry, j);
            if (createEvent == null) {
                return null;
            }
            BucketRegion localBucketById = createEvent.getRegion().getDataStore().getLocalBucketById(Integer.valueOf(createEvent.getKeyInfo().getBucketId()));
            if (localBucketById != null) {
                localBucketById.handleWANEvent(createEvent);
            }
            return createEvent;
        }

        private EntryEventImpl createEvent(InternalCache internalCache, InternalRegion internalRegion, Region.Entry<?, ?> entry, long j) {
            if (entry instanceof DestroyedEntry) {
                return null;
            }
            try {
                if (mustDiscardEntry(entry, j, internalRegion)) {
                    return null;
                }
                EntryEventImpl create = new DefaultEntryEventFactory().create(internalRegion, Operation.UPDATE, entry.getKey(), entry.getValue(), (Object) null, false, internalCache.getInternalDistributedSystem().getDistributedMember(), false);
                if (internalRegion.getAttributes().getConcurrencyChecksEnabled()) {
                    if (entry instanceof NonTXEntry) {
                        create.setVersionTag(((NonTXEntry) entry).getRegionEntry().getVersionStamp().asVersionTag());
                    } else {
                        create.setVersionTag(((EntrySnapshot) entry).getVersionTag());
                    }
                }
                create.setNewEventId(internalCache.getInternalDistributedSystem());
                return create;
            } catch (EntryDestroyedException e) {
                return null;
            }
        }

        private boolean mustDiscardEntry(Region.Entry<?, ?> entry, long j, InternalRegion internalRegion) {
            if (entry instanceof DestroyedEntry) {
                return true;
            }
            if (internalRegion.getAttributes().getConcurrencyChecksEnabled()) {
                return (entry instanceof NonTXEntry ? ((NonTXEntry) entry).getRegionEntry().getVersionStamp().getVersionTimeStamp() : ((EntrySnapshot) entry).getVersionTag().getVersionTimeStamp()) > j;
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeper.class */
    public interface ThreadSleeper extends Serializable {
        void sleep(long j) throws InterruptedException;
    }

    /* loaded from: input_file:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeperImpl.class */
    static class ThreadSleeperImpl implements ThreadSleeper {
        ThreadSleeperImpl() {
        }

        @Override // org.apache.geode.management.internal.cli.functions.WanCopyRegionFunctionDelegate.ThreadSleeper
        public void sleep(long j) throws InterruptedException {
            Thread.sleep(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WanCopyRegionFunctionDelegate() {
        this(Clock.systemDefaultZone(), new ThreadSleeperImpl(), new EventCreatorImpl(), WAIT_BEFORE_COPY_MS);
    }

    @VisibleForTesting
    WanCopyRegionFunctionDelegate(Clock clock, ThreadSleeper threadSleeper, EventCreator eventCreator, int i) {
        this.batchId = 0;
        this.functionStartTimestamp = 0L;
        this.clock = clock;
        this.threadSleeper = threadSleeper;
        this.eventCreator = eventCreator;
        this.waitBeforeCopyMs = i;
    }

    public CliFunctionResult wanCopyRegion(InternalCache internalCache, String str, Region<?, ?> region, GatewaySender gatewaySender, long j, int i) throws InterruptedException {
        this.functionStartTimestamp = ((InternalRegion) region).getCache().cacheTimeMillis();
        Thread.sleep(this.waitBeforeCopyMs);
        ConnectionState connectionState = new ConnectionState();
        int i2 = 0;
        Iterator<?> it = getEntries(region, gatewaySender).iterator();
        long millis = this.clock.millis();
        while (it.hasNext()) {
            try {
                List<GatewayQueueEvent<?, ?>> createBatch = createBatch((InternalRegion) region, gatewaySender, i, internalCache, it);
                if (createBatch.size() != 0) {
                    Optional<CliFunctionResult> connectIfNeeded = connectionState.connectIfNeeded(str, gatewaySender);
                    if (connectIfNeeded.isPresent()) {
                        CliFunctionResult cliFunctionResult = connectIfNeeded.get();
                        connectionState.close();
                        return cliFunctionResult;
                    }
                    Optional<CliFunctionResult> sendBatch = sendBatch(str, gatewaySender, createBatch, connectionState, i2);
                    if (sendBatch.isPresent()) {
                        CliFunctionResult cliFunctionResult2 = sendBatch.get();
                        connectionState.close();
                        return cliFunctionResult2;
                    }
                    i2 += createBatch.size();
                    doPostSendBatchActions(millis, i2, j);
                }
            } finally {
                connectionState.close();
            }
        }
        return region.isDestroyed() ? new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, CliStrings.format(WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, new Object[]{"Region destroyed", Integer.valueOf(i2)})) : new CliFunctionResult(str, CliFunctionResult.StatusState.OK, CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, Integer.valueOf(i2)));
    }

    private Optional<CliFunctionResult> sendBatch(String str, GatewaySender gatewaySender, List<GatewayQueueEvent<?, ?>> list, ConnectionState connectionState, int i) {
        Optional<CliFunctionResult> reconnect;
        GatewaySenderEventDispatcher dispatcher = ((AbstractGatewaySender) gatewaySender).getEventProcessor().getDispatcher();
        int i2 = 0;
        do {
            try {
                dispatcher.sendBatch(list, connectionState.getConnection(), connectionState.getSenderPool(), getAndIncrementBatchId(), true);
                return Optional.empty();
            } catch (ConnectionDestroyedException | ServerConnectivityException e) {
                int i3 = i2;
                i2 += MAX_BATCH_SEND_RETRIES;
                reconnect = connectionState.reconnect(str, i3, i, e);
            } catch (BatchException70 e2) {
                return Optional.of(new CliFunctionResult(str, CliFunctionResult.StatusState.ERROR, CliStrings.format(WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED, new Object[]{((BatchException70) e2.getExceptions().get(0)).getCause(), Integer.valueOf(i)})));
            }
        } while (!reconnect.isPresent());
        return reconnect;
    }

    private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion internalRegion, GatewaySender gatewaySender, int i, InternalCache internalCache, Iterator<?> it) {
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        while (it.hasNext() && i2 < i) {
            GatewayQueueEvent<?, ?> createGatewaySenderEvent = this.eventCreator.createGatewaySenderEvent(internalCache, internalRegion, gatewaySender, (Region.Entry) it.next(), this.functionStartTimestamp);
            if (createGatewaySenderEvent != null) {
                arrayList.add(createGatewaySenderEvent);
                i2 += MAX_BATCH_SEND_RETRIES;
            }
        }
        return arrayList;
    }

    private Collection<?> getEntries(Region<?, ?> region, GatewaySender gatewaySender) {
        return ((region instanceof PartitionedRegion) && gatewaySender.isParallel()) ? PartitionRegionHelper.getLocalPrimaryData(region).entrySet() : region.entrySet();
    }

    @VisibleForTesting
    void doPostSendBatchActions(long j, int i, long j2) throws InterruptedException {
        long timeToSleep = getTimeToSleep(j, i, j2);
        if (timeToSleep > 0) {
            logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate", getClass().getSimpleName(), Long.valueOf(timeToSleep));
            this.threadSleeper.sleep(timeToSleep);
        } else if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException();
        }
    }

    private int getAndIncrementBatchId() {
        if (this.batchId + MAX_BATCH_SEND_RETRIES == Integer.MAX_VALUE) {
            this.batchId = 0;
        }
        int i = this.batchId;
        this.batchId = i + MAX_BATCH_SEND_RETRIES;
        return i;
    }

    @VisibleForTesting
    long getTimeToSleep(long j, int i, long j2) {
        if (j2 == 0) {
            return 0L;
        }
        long millis = this.clock.millis() - j;
        if (millis == 0 || (i * 1000.0d) / millis > j2) {
            return ((i * 1000) / j2) - millis;
        }
        return 0L;
    }
}
