/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.ExpressionNode;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IdempotentConsumerConcurrentTest
extends ContextTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumerConcurrentTest.class);
    protected Endpoint startEndpoint;
    protected MockEndpoint resultEndpoint;

    @Override
    public boolean isUseRouteBuilder() {
        return false;
    }

    @Test
    public void testDuplicateMessagesAreFilteredOut() throws Exception {
        this.context.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                this.from("direct:start").idempotentConsumer((Expression)this.header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository((int)200)).to("mock:result");
            }
        });
        this.context.start();
        this.resultEndpoint.expectedBodiesReceived(new Object[]{"one", "two", "three"});
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("3", "three");
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
        this.context.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                this.errorHandler((ErrorHandlerFactory)this.deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0L).logStackTrace(false));
                ((ExpressionNode)this.from("direct:start").idempotentConsumer((Expression)this.header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository((int)200)).process(new Processor(){

                    public void process(Exchange exchange) {
                        String id = (String)exchange.getIn().getHeader("messageId", String.class);
                        if (id.equals("2")) {
                            throw new IllegalArgumentException("Damm I cannot handle id 2");
                        }
                    }
                })).to("mock:result");
            }
        });
        this.context.start();
        this.getMockEndpoint("mock:error").expectedMessageCount(2);
        this.resultEndpoint.expectedBodiesReceived(new Object[]{"one", "three"});
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("3", "three");
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testFailedExchangesNotAdded() throws Exception {
        this.context.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                ((ExpressionNode)this.from("direct:start").idempotentConsumer((Expression)this.header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository((int)200)).process(new Processor(){

                    public void process(Exchange exchange) {
                        String id = (String)exchange.getIn().getHeader("messageId", String.class);
                        if (id.equals("2")) {
                            throw new IllegalArgumentException("Damm I cannot handle id 2");
                        }
                    }
                })).to("mock:result");
            }
        });
        this.context.start();
        this.resultEndpoint.expectedBodiesReceived(new Object[]{"one", "three"});
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("2", "two");
        this.sendMessage("1", "one");
        this.sendMessage("3", "three");
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testThreadedIdempotentConsumer() throws Exception {
        int i;
        int loopCount = 100;
        int threadCount = 10;
        this.context.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                this.from("direct:start").idempotentConsumer((Expression)this.header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository((int)200)).delay(1L).to("mock:result");
            }
        });
        this.context.start();
        this.resultEndpoint.reset();
        this.resultEndpoint.expectedMessageCount(100);
        final boolean[] failedFlag = new boolean[]{false};
        Thread[] threads = new Thread[10];
        for (i = 0; i < 10; ++i) {
            int threadIndex = i;
            threads[threadIndex] = new Thread(){

                @Override
                public void run() {
                    try {
                        for (int j = 0; j < 100; ++j) {
                            IdempotentConsumerConcurrentTest.this.sendMessage(String.valueOf(j), "multithreadedTest" + j);
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Failed to send message: {}", (Object)e.getMessage(), (Object)e);
                        failedFlag[0] = true;
                    }
                }
            };
            threads[i].start();
        }
        for (i = 0; i < 10; ++i) {
            threads[i].join();
        }
        Assertions.assertFalse((boolean)failedFlag[0], (String)"At least one thread threw an exception");
        this.assertMockEndpointsSatisfied();
    }

    protected void sendMessage(final Object messageId, final Object body) {
        this.template.send(this.startEndpoint, new Processor(){

            public void process(Exchange exchange) {
                Message in = exchange.getIn();
                in.setBody(body);
                in.setHeader("messageId", messageId);
            }
        });
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.startEndpoint = this.resolveMandatoryEndpoint("direct:start");
        this.resultEndpoint = this.getMockEndpoint("mock:result");
    }
}

