/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.cache.DefaultProducerCache;
import org.apache.camel.support.cache.ProducerServicePool;
import org.apache.camel.util.function.ThrowingFunction;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;

@DisabledOnOs(architectures={"s390x"}, disabledReason="This test does not run reliably on s390x (see CAMEL-21438)")
public class DefaultProducerCacheTest
extends ContextTestSupport {
    private final AtomicInteger producerCounter = new AtomicInteger();
    private final AtomicInteger stopCounter = new AtomicInteger();
    private final AtomicInteger shutdownCounter = new AtomicInteger();
    private MyComponent component;

    @Test
    public void testCacheProducerAcquireAndRelease() {
        DefaultProducerCache cache = new DefaultProducerCache((Object)this, (CamelContext)this.context, 0);
        cache.start();
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        for (int i = 0; i < 1003; ++i) {
            Endpoint e = this.context.getEndpoint("direct:queue:" + i);
            AsyncProducer p = cache.acquireProducer(e);
            cache.releaseProducer(e, p);
        }
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            cache.cleanUp();
            Assertions.assertEquals((int)1000, (int)cache.size(), (String)"Size should be 1000");
        });
        cache.stop();
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
    }

    @Test
    public void testCacheStopExpired() {
        DefaultProducerCache cache = new DefaultProducerCache((Object)this, (CamelContext)this.context, 5);
        cache.start();
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        for (int i = 0; i < 8; ++i) {
            MyEndpoint e = this.newEndpoint(true, i);
            e.setCamelContext((CamelContext)this.context);
            AsyncProducer p = cache.acquireProducer((Endpoint)e);
            cache.releaseProducer((Endpoint)e, p);
        }
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            cache.cleanUp();
            Assertions.assertEquals((int)5, (int)cache.size(), (String)"Size should be 5");
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals((int)3, (int)this.stopCounter.get()));
        cache.stop();
        Assertions.assertEquals((int)8, (int)this.stopCounter.get());
    }

    @Test
    public void testExtendedStatistics() {
        DefaultProducerCache cache = new DefaultProducerCache((Object)this, (CamelContext)this.context, 5);
        cache.setExtendedStatistics(true);
        cache.start();
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        MyEndpoint e = this.newEndpoint(true, 1);
        AsyncProducer p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 1);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 2);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 2);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 2);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 3);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        e = this.newEndpoint(true, 4);
        p = cache.acquireProducer((Endpoint)e);
        cache.releaseProducer((Endpoint)e, p);
        Assertions.assertEquals((int)4, (int)cache.size(), (String)"Size should be 4");
        EndpointUtilizationStatistics stats = cache.getEndpointUtilizationStatistics();
        Assertions.assertEquals((int)4, (int)stats.size());
        Map recent = stats.getStatistics();
        Assertions.assertEquals((long)2L, (long)((Long)recent.get("my://1")));
        Assertions.assertEquals((long)3L, (long)((Long)recent.get("my://2")));
        Assertions.assertEquals((long)1L, (long)((Long)recent.get("my://3")));
        Assertions.assertEquals((long)1L, (long)((Long)recent.get("my://4")));
        Assertions.assertNull(recent.get("my://5"));
        cache.stop();
    }

    @Test
    public void testCacheEvictWhileInUse() {
        this.producerCounter.set(0);
        MyProducerCache cache = new MyProducerCache(this, (CamelContext)this.context, 2);
        cache.start();
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        MyEndpoint e = this.newEndpoint(false, 1);
        e.setCamelContext((CamelContext)this.context);
        AsyncProducer p1 = cache.acquireProducer((Endpoint)e);
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        AsyncProducer p2 = cache.acquireProducer((Endpoint)e);
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        cache.releaseProducer((Endpoint)e, p2);
        cache.releaseProducer((Endpoint)e, p1);
        Assertions.assertEquals((int)2, (int)cache.size(), (String)"Size should be 2");
        Assertions.assertEquals((int)0, (int)this.stopCounter.get());
        p1 = cache.acquireProducer((Endpoint)e);
        p2 = cache.acquireProducer((Endpoint)e);
        AsyncProducer p3 = cache.acquireProducer((Endpoint)e);
        Assertions.assertEquals((int)0, (int)cache.size(), (String)"Size should be 0");
        Assertions.assertEquals((int)0, (int)this.stopCounter.get());
        cache.forceEvict(p2);
        Assertions.assertEquals((int)0, (int)this.stopCounter.get());
        cache.releaseProducer((Endpoint)e, p3);
        cache.releaseProducer((Endpoint)e, p2);
        Assertions.assertEquals((int)1, (int)this.stopCounter.get());
        cache.stop();
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals((int)3, (int)this.stopCounter.get()));
    }

    @Test
    public void testAcquireProducerConcurrency() throws InterruptedException, ExecutionException {
        int i;
        DefaultProducerCache cache = new DefaultProducerCache((Object)this, (CamelContext)this.context, 0);
        cache.start();
        ArrayList<Endpoint> endpoints = new ArrayList<Endpoint>();
        for (int i2 = 0; i2 < 3; ++i2) {
            Endpoint e = this.context.getEndpoint("direct:queue:" + i2);
            AsyncProducer p = cache.acquireProducer(e);
            endpoints.add(e);
        }
        Assertions.assertEquals((int)3, (int)cache.size());
        ExecutorService ex = Executors.newFixedThreadPool(16);
        ArrayList<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>();
        for (i = 0; i < 500; ++i) {
            int index = i % 3;
            callables.add(() -> this.isEqualTask(cache, endpoints, index));
        }
        for (i = 1; i <= 100; ++i) {
            this.log.info("Iteration: {}", (Object)i);
            List results = ex.invokeAll(callables);
            for (Future future : results) {
                Assertions.assertEquals((Object)true, future.get());
            }
        }
    }

    private boolean isEqualTask(DefaultProducerCache cache, List<Endpoint> endpoints, int index) {
        AsyncProducer producer = cache.acquireProducer(endpoints.get(index));
        boolean isEqual = producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
        if (!isEqual) {
            this.log.info("Endpoint uri to acquire: {}, returned producer (uri): {}", (Object)endpoints.get(index).getEndpointUri(), (Object)producer.getEndpoint().getEndpointUri());
        }
        return isEqual;
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.component = new MyComponent((CamelContext)this.context);
    }

    protected MyEndpoint newEndpoint(boolean isSingleton, int number) {
        return new MyEndpoint(this.component, isSingleton, number);
    }

    private final class MyEndpoint
    extends DefaultEndpoint {
        private final boolean isSingleton;

        private MyEndpoint(MyComponent component, boolean isSingleton, int number) {
            super("my://" + number, (Component)component);
            this.isSingleton = isSingleton;
        }

        public Producer createProducer() {
            return new MyProducer((Endpoint)this);
        }

        public Consumer createConsumer(Processor processor) {
            return null;
        }

        public boolean isSingleton() {
            return this.isSingleton;
        }
    }

    private static class MyProducerCache
    extends DefaultProducerCache {
        private MyServicePool myServicePool;

        public MyProducerCache(Object source, CamelContext camelContext, int cacheSize) {
            super(source, camelContext, cacheSize);
        }

        protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {
            this.myServicePool = new MyServicePool((ThrowingFunction<Endpoint, AsyncProducer, Exception>)((ThrowingFunction)Endpoint::createAsyncProducer), EndpointAware::getEndpoint, cacheSize);
            return this.myServicePool;
        }

        public void forceEvict(AsyncProducer producer) {
            this.myServicePool.onEvict(producer);
        }
    }

    private static final class MyComponent
    extends DefaultComponent {
        public MyComponent(CamelContext context) {
            super(context);
        }

        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) {
            throw new UnsupportedOperationException();
        }
    }

    private final class MyProducer
    extends DefaultProducer {
        private final int id;

        MyProducer(Endpoint endpoint) {
            super(endpoint);
            this.id = DefaultProducerCacheTest.this.producerCounter.incrementAndGet();
        }

        public void process(Exchange exchange) {
        }

        protected void doStop() {
            DefaultProducerCacheTest.this.stopCounter.incrementAndGet();
        }

        protected void doShutdown() {
            DefaultProducerCacheTest.this.shutdownCounter.incrementAndGet();
        }

        public String toString() {
            return "MyProducer[" + this.id + "]";
        }
    }

    private static class MyServicePool
    extends ProducerServicePool {
        public MyServicePool(ThrowingFunction<Endpoint, AsyncProducer, Exception> creator, Function<AsyncProducer, Endpoint> getEndpoint, int capacity) {
            super(creator, getEndpoint, capacity);
        }

        protected void onEvict(AsyncProducer asyncProducer) {
            super.onEvict((Service)asyncProducer);
        }
    }
}

