package org.apache.camel.processor.aggregate.hazelcast;

import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.transaction.TransactionContext;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.transaction.TransactionalMap;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.class */
public class ReplicatedHazelcastAggregationRepository extends HazelcastAggregationRepository {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicatedHazelcastAggregationRepository.class.getName());
    protected Map<String, DefaultExchangeHolder> replicatedCache;
    protected Map<String, DefaultExchangeHolder> replicatedPersistedCache;

    public ReplicatedHazelcastAggregationRepository(String str) {
        super(str);
    }

    public ReplicatedHazelcastAggregationRepository(String str, String str2) {
        super(str, str2);
    }

    public ReplicatedHazelcastAggregationRepository(String str, boolean z) {
        super(str, z);
    }

    public ReplicatedHazelcastAggregationRepository(String str, String str2, boolean z) {
        super(str, str2, z);
    }

    public ReplicatedHazelcastAggregationRepository(String str, HazelcastInstance hazelcastInstance) {
        super(str, hazelcastInstance);
    }

    public ReplicatedHazelcastAggregationRepository(String str, String str2, HazelcastInstance hazelcastInstance) {
        super(str, str2, hazelcastInstance);
    }

    public ReplicatedHazelcastAggregationRepository(String str, boolean z, HazelcastInstance hazelcastInstance) {
        super(str, z, hazelcastInstance);
    }

    public ReplicatedHazelcastAggregationRepository(String str, String str2, boolean z, HazelcastInstance hazelcastInstance) {
        super(str, str2, z, hazelcastInstance);
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Exchange add(CamelContext camelContext, String str, Exchange exchange, Exchange exchange2) throws OptimisticLockingAggregationRepository.OptimisticLockingException {
        if (!this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", exchange2.getExchangeId(), str);
        if (exchange == null) {
            DefaultExchangeHolder putIfAbsent = this.replicatedCache.putIfAbsent(str, DefaultExchangeHolder.marshal(exchange2, true, this.allowSerializedHeaders));
            if (putIfAbsent != null) {
                Exchange unmarshallExchange = unmarshallExchange(camelContext, putIfAbsent);
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", str, unmarshallExchange != null ? unmarshallExchange.getExchangeId() : "<null>");
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        } else {
            if (!this.replicatedCache.replace(str, DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders), DefaultExchangeHolder.marshal(exchange2, true, this.allowSerializedHeaders))) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", str);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        }
        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", exchange2.getExchangeId(), str);
        return exchange;
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Exchange add(CamelContext camelContext, String str, Exchange exchange) {
        if (this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
        FencedLock lock = this.hzInstance.getCPSubsystem().getLock(this.mapName);
        try {
            lock.lock();
            Exchange unmarshallExchange = unmarshallExchange(camelContext, this.replicatedCache.put(str, DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders)));
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            lock.unlock();
            return unmarshallExchange;
        } catch (Throwable th) {
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Set<String> scan(CamelContext camelContext) {
        if (!this.useRecovery) {
            LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", camelContext.getName(), this.mapName);
            return Collections.emptySet();
        }
        LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
        Set<String> unmodifiableSet = Collections.unmodifiableSet(this.replicatedPersistedCache.keySet());
        LOG.trace("Found {} keys for exchanges to recover in {} context", Integer.valueOf(unmodifiableSet.size()), camelContext.getName());
        return unmodifiableSet;
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Exchange recover(CamelContext camelContext, String str) {
        LOG.trace("Recovering an Exchange with ID {}.", str);
        if (this.useRecovery) {
            return unmarshallExchange(camelContext, this.replicatedPersistedCache.get(str));
        }
        return null;
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Exchange get(CamelContext camelContext, String str) {
        return unmarshallExchange(camelContext, this.replicatedCache.get(str));
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public boolean containsKey(Object obj) {
        if (this.replicatedCache != null) {
            return this.replicatedCache.containsKey(obj);
        }
        return false;
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public void remove(CamelContext camelContext, String str, Exchange exchange) {
        DefaultExchangeHolder marshal = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
        if (this.optimistic) {
            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), str);
            if (!this.replicatedCache.remove(str, marshal)) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", str);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), str);
            if (this.useRecovery) {
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), str);
                this.replicatedPersistedCache.put(exchange.getExchangeId(), marshal);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), str);
                return;
            }
            return;
        }
        if (!this.useRecovery) {
            this.replicatedCache.remove(str);
            return;
        }
        LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
        TransactionOptions transactionOptions = new TransactionOptions();
        transactionOptions.setTransactionType(TransactionOptions.TransactionType.ONE_PHASE);
        TransactionContext newTransactionContext = this.hzInstance.newTransactionContext(transactionOptions);
        try {
            newTransactionContext.beginTransaction();
            TransactionalMap map = newTransactionContext.getMap(this.mapName);
            TransactionalMap map2 = newTransactionContext.getMap(this.persistenceMapName);
            DefaultExchangeHolder defaultExchangeHolder = (DefaultExchangeHolder) map.remove(str);
            LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", exchange.getExchangeId(), str);
            map2.put(exchange.getExchangeId(), defaultExchangeHolder);
            newTransactionContext.commitTransaction();
            LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", exchange.getExchangeId(), str);
        } catch (Throwable th) {
            newTransactionContext.rollbackTransaction();
            String format = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.", newTransactionContext.getTxnId(), str, exchange.getExchangeId());
            LOG.warn(format, th);
            throw new RuntimeCamelException(format, th);
        }
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public void confirm(CamelContext camelContext, String str) {
        LOG.trace("Confirming an exchange with ID {}.", str);
        if (this.useRecovery) {
            this.replicatedPersistedCache.remove(str);
        }
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(this.replicatedCache.keySet());
    }

    @Override // org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository
    protected void doStart() throws Exception {
        if (this.maximumRedeliveries < 0) {
            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
        }
        if (this.recoveryInterval < 0) {
            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
        }
        StringHelper.notEmpty(this.mapName, "repositoryName");
        if (this.useLocalHzInstance) {
            Config build = new XmlConfigBuilder().build();
            build.setProperty("hazelcast.version.check.enabled", "false");
            this.hzInstance = Hazelcast.newHazelcastInstance(build);
        } else {
            ObjectHelper.notNull(this.hzInstance, "hzInstanse");
        }
        this.replicatedCache = this.hzInstance.getReplicatedMap(this.mapName);
        if (this.useRecovery) {
            this.replicatedPersistedCache = this.hzInstance.getReplicatedMap(this.persistenceMapName);
        }
    }
}
