/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.aggregator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.builder.ValueBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class AggregateProcessorTimeoutCompletionRestartTest
extends ContextTestSupport {
    private ExecutorService executorService;

    @Override
    public boolean isUseRouteBuilder() {
        return false;
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.executorService = Executors.newSingleThreadExecutor();
    }

    @Test
    public void testAggregateProcessorTimeoutRestart() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"A+B"});
        mock.expectedPropertyReceived("CamelAggregatedCompletedBy", (Object)"timeout");
        SendProcessor done = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder corr = AggregateProcessorTimeoutCompletionRestartTest.header("id");
        BodyInAggregatingStrategy as = new BodyInAggregatingStrategy();
        AggregateProcessor ap = new AggregateProcessor((CamelContext)this.context, (AsyncProcessor)done, (Expression)corr, (AggregationStrategy)as, this.executorService, true);
        ap.setCompletionTimeout(250L);
        ap.setCompletionTimeoutCheckerInterval(10L);
        ap.start();
        DefaultExchange e1 = new DefaultExchange((CamelContext)this.context);
        e1.getIn().setBody((Object)"A");
        e1.getIn().setHeader("id", (Object)123);
        DefaultExchange e2 = new DefaultExchange((CamelContext)this.context);
        e2.getIn().setBody((Object)"B");
        e2.getIn().setHeader("id", (Object)123);
        ap.process((Exchange)e1);
        ap.process((Exchange)e2);
        ap.stop();
        Assertions.assertEquals((int)0, (int)mock.getReceivedCounter());
        ap.start();
        this.assertMockEndpointsSatisfied();
        Assertions.assertEquals((int)1, (int)mock.getReceivedCounter());
        ap.shutdown();
    }

    @Test
    public void testAggregateProcessorTimeoutExpressionRestart() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"A+B"});
        mock.expectedPropertyReceived("CamelAggregatedCompletedBy", (Object)"timeout");
        SendProcessor done = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder corr = AggregateProcessorTimeoutCompletionRestartTest.header("id");
        BodyInAggregatingStrategy as = new BodyInAggregatingStrategy();
        AggregateProcessor ap = new AggregateProcessor((CamelContext)this.context, (AsyncProcessor)done, (Expression)corr, (AggregationStrategy)as, this.executorService, true);
        ap.setCompletionTimeoutExpression((Expression)AggregateProcessorTimeoutCompletionRestartTest.header("myTimeout"));
        ap.setCompletionTimeoutCheckerInterval(10L);
        ap.start();
        DefaultExchange e1 = new DefaultExchange((CamelContext)this.context);
        e1.getIn().setBody((Object)"A");
        e1.getIn().setHeader("id", (Object)123);
        e1.getIn().setHeader("myTimeout", (Object)250);
        DefaultExchange e2 = new DefaultExchange((CamelContext)this.context);
        e2.getIn().setBody((Object)"B");
        e2.getIn().setHeader("id", (Object)123);
        e2.getIn().setHeader("myTimeout", (Object)250);
        ap.process((Exchange)e1);
        ap.process((Exchange)e2);
        ap.stop();
        Assertions.assertEquals((int)0, (int)mock.getReceivedCounter());
        ap.start();
        this.assertMockEndpointsSatisfied();
        Assertions.assertEquals((int)1, (int)mock.getReceivedCounter());
        ap.shutdown();
    }

    @Test
    public void testAggregateProcessorTwoTimeoutExpressionRestart() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"C+D", "A+B"});
        mock.expectedPropertyReceived("CamelAggregatedCompletedBy", (Object)"timeout");
        SendProcessor done = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder corr = AggregateProcessorTimeoutCompletionRestartTest.header("id");
        BodyInAggregatingStrategy as = new BodyInAggregatingStrategy();
        AggregateProcessor ap = new AggregateProcessor((CamelContext)this.context, (AsyncProcessor)done, (Expression)corr, (AggregationStrategy)as, this.executorService, true);
        ap.setCompletionTimeoutExpression((Expression)AggregateProcessorTimeoutCompletionRestartTest.header("myTimeout"));
        ap.setCompletionTimeoutCheckerInterval(10L);
        ap.start();
        DefaultExchange e1 = new DefaultExchange((CamelContext)this.context);
        e1.getIn().setBody((Object)"A");
        e1.getIn().setHeader("id", (Object)123);
        e1.getIn().setHeader("myTimeout", (Object)300);
        DefaultExchange e2 = new DefaultExchange((CamelContext)this.context);
        e2.getIn().setBody((Object)"B");
        e2.getIn().setHeader("id", (Object)123);
        e2.getIn().setHeader("myTimeout", (Object)300);
        DefaultExchange e3 = new DefaultExchange((CamelContext)this.context);
        e3.getIn().setBody((Object)"C");
        e3.getIn().setHeader("id", (Object)456);
        e3.getIn().setHeader("myTimeout", (Object)250);
        DefaultExchange e4 = new DefaultExchange((CamelContext)this.context);
        e4.getIn().setBody((Object)"D");
        e4.getIn().setHeader("id", (Object)456);
        e4.getIn().setHeader("myTimeout", (Object)250);
        ap.process((Exchange)e1);
        ap.process((Exchange)e2);
        ap.process((Exchange)e3);
        ap.process((Exchange)e4);
        ap.stop();
        Assertions.assertEquals((int)0, (int)mock.getReceivedCounter());
        ap.start();
        this.assertMockEndpointsSatisfied();
        Assertions.assertEquals((int)2, (int)mock.getReceivedCounter());
        ap.shutdown();
    }
}

