/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.converter.stream.CachedOutputStream;
import org.apache.camel.model.MulticastDefinition;
import org.apache.camel.model.RouteDefinition;
import org.junit.jupiter.api.Test;

public class MultiCastStreamCachingInSubRouteTest
extends ContextTestSupport {
    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                MultiCastStreamCachingInSubRouteTest.this.context.setStreamCaching(Boolean.valueOf(true));
                MultiCastStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setEnabled(true);
                MultiCastStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setSpoolDirectory(MultiCastStreamCachingInSubRouteTest.this.testDirectory().toFile());
                MultiCastStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setSpoolThreshold(1L);
                ((MulticastDefinition)this.from("direct:start").multicast((AggregationStrategy)new InternalAggregationStrategy()).to(new String[]{"direct:a", "direct:b"})).end().to("mock:result");
                ((MulticastDefinition)this.from("direct:startNestedMultiCast").multicast((AggregationStrategy)new InternalAggregationStrategy()).to("direct:start")).end().to("mock:resultNested");
                ((RouteDefinition)this.from("direct:a").process((Processor)new InputProcessorWithStreamCache(1))).to("mock:resulta");
                ((RouteDefinition)this.from("direct:b").process((Processor)new InputProcessorWithStreamCache(2))).to("mock:resultb");
            }
        };
    }

    @Test
    public void testWithAggregationStrategyAndStreamCacheInSubRoute() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"Test Message 1Test Message 2"});
        this.template.sendBody("direct:start", (Object)"<start></start>");
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testNestedMultiCastWithCachedStreamInAggregationStrategy() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:resultNested");
        mock.expectedBodiesReceived(new Object[]{"Test Message 1Test Message 2"});
        this.template.sendBody("direct:startNestedMultiCast", (Object)"<start></start>");
        this.assertMockEndpointsSatisfied();
    }

    public static class InternalAggregationStrategy
    implements AggregationStrategy {
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            try {
                String oldBody = (String)oldExchange.getIn().getBody(String.class);
                String newBody = (String)newExchange.getIn().getBody(String.class);
                String merged = oldBody + newBody;
                CachedOutputStream cos = new CachedOutputStream(newExchange);
                cos.write(merged.getBytes(StandardCharsets.UTF_8));
                cos.close();
                oldExchange.getIn().setBody((Object)cos.newStreamCache());
                return oldExchange;
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    public static class InputProcessorWithStreamCache
    implements Processor {
        private final int number;

        public InputProcessorWithStreamCache(int number) {
            this.number = number;
        }

        public void process(Exchange exchange) throws Exception {
            CachedOutputStream cos = new CachedOutputStream(exchange);
            String s = "Test Message " + this.number;
            cos.write(s.getBytes(StandardCharsets.UTF_8));
            cos.close();
            InputStream is = (InputStream)cos.newStreamCache();
            exchange.getMessage().setBody((Object)is);
        }
    }
}

