package org.apache.jena.riot.system;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.graph.Graph;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFFormat;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.RiotNotFoundException;
import org.apache.jena.sparql.graph.GraphFactory;
import org.apache.jena.sparql.util.IsoMatcher;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/jena/riot/system/TestAsyncParser.class */
public class TestAsyncParser {
    private static String DIR = "testing/RIOT/Parser/";

    /* loaded from: input_file:org/apache/jena/riot/system/TestAsyncParser$FailingOutputStream.class */
    private static class FailingOutputStream extends OutputStream {
        public static final String ERROR_MSG = "Mocked IO Error";

        private FailingOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            throw new IOException(ERROR_MSG);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jena/riot/system/TestAsyncParser$RepeatingReadableByteChannel.class */
    public static class RepeatingReadableByteChannel implements ReadableByteChannel {
        protected byte[] data;
        protected boolean isOpen;
        protected long pos;

        public RepeatingReadableByteChannel(byte[] bArr) {
            this(bArr, 0L);
        }

        public RepeatingReadableByteChannel(byte[] bArr, long j) {
            Objects.requireNonNull(bArr);
            if (bArr.length == 0) {
                throw new RuntimeException("Provided data array must have at least 1 item");
            }
            this.data = bArr;
            this.pos = j;
            this.isOpen = true;
        }

        @Override // java.nio.channels.Channel
        public boolean isOpen() {
            return this.isOpen;
        }

        @Override // java.nio.channels.Channel, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.isOpen = false;
        }

        @Override // java.nio.channels.ReadableByteChannel
        public int read(ByteBuffer byteBuffer) throws IOException {
            int length = this.data.length;
            int i = (int) (this.pos % length);
            int min = Math.min(byteBuffer.remaining(), length - i);
            byteBuffer.put(this.data, i, min);
            this.pos += min;
            return min;
        }

        public long position() {
            return this.pos;
        }

        public void position(long j) {
            this.pos = j;
        }
    }

    @Test
    public void async_parse_1() {
        test(DIR + "empty.ttl");
    }

    @Test
    public void async_parse_2() {
        test(DIR + "data.ttl");
    }

    @Test(expected = RiotException.class)
    public void async_parse_3() {
        test(DIR + "bad-data.ttl");
    }

    @Test(expected = RiotNotFoundException.class)
    public void async_parse_4() {
        test(DIR + "no-suchfile.ttl");
    }

    @Test
    public void async_iterator1() {
        Assert.assertFalse(AsyncParser.asyncParseTriples(DIR + "empty.ttl").hasNext());
    }

    @Test
    public void async_iterator2() {
        Assert.assertTrue(AsyncParser.asyncParseTriples(DIR + "data.ttl").hasNext());
    }

    @Test
    public void sources_1() {
        AsyncParser.asyncParseSources(List.of(RDFParser.fromString("_:a <p> <o>.", Lang.TTL), RDFParser.fromString("_:a <p> <o>.", Lang.TTL)), StreamRDFLib.graph(GraphFactory.createDefaultGraph()));
        Assert.assertEquals(2L, r0.size());
    }

    @Test
    public void sources_2() {
        AsyncParser.asyncParseSources(List.of(), StreamRDFLib.graph(GraphFactory.createDefaultGraph()));
        Assert.assertEquals(0L, r0.size());
    }

    private static void test(String str) {
        Graph createDefaultGraph = GraphFactory.createDefaultGraph();
        Graph createDefaultGraph2 = GraphFactory.createDefaultGraph();
        AsyncParser.asyncParse(str, StreamRDFLib.graph(createDefaultGraph2));
        RDFParser.source(str).parse(createDefaultGraph);
        Assert.assertEquals(createDefaultGraph.size(), createDefaultGraph2.size());
        Assert.assertTrue(IsoMatcher.isomorphic(createDefaultGraph, createDefaultGraph2));
    }

    @Test
    public void failingSink() {
        int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
        for (int i = 0; i < 20; i++) {
            try {
                InputStream openInfiniteNtStream = openInfiniteNtStream();
                try {
                    FailingOutputStream failingOutputStream = new FailingOutputStream();
                    try {
                        StreamRDF writerStream = StreamRDFWriter.getWriterStream(failingOutputStream, RDFFormat.NT);
                        writerStream.start();
                        AsyncParser.of(openInfiniteNtStream, Lang.NT, (String) null).setChunkSize(10).setQueueSize(3).asyncParseSources(writerStream);
                        writerStream.finish();
                        failingOutputStream.close();
                        if (openInfiniteNtStream != null) {
                            openInfiniteNtStream.close();
                        }
                        throw new RuntimeException("Parsing unexpectedly succeeded");
                    } catch (Throwable th) {
                        try {
                            failingOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Exception e) {
                Assert.assertNotNull("Unexpected exception: " + e, e.getCause());
                Assert.assertEquals(FailingOutputStream.ERROR_MSG, e.getCause().getMessage());
            }
        }
        int abs = Math.abs(ManagementFactory.getThreadMXBean().getThreadCount() - threadCount);
        Assert.assertTrue("Cancelling RDF parsing resulted in too many dangling threads (" + abs + ")", abs <= 5);
    }

    @Test
    public void repeatedParsingCancellation_1() throws Exception {
        int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
        for (int i = 0; i < 20; i++) {
            IteratorCloseable iteratorCloseable = null;
            try {
                iteratorCloseable = AsyncParser.of(DIR + "data.ttl").setDaemonMode(false).asyncParseTriples();
                iteratorCloseable.hasNext();
                if (iteratorCloseable != null) {
                    iteratorCloseable.close();
                }
            } catch (Throwable th) {
                if (iteratorCloseable != null) {
                    iteratorCloseable.close();
                }
                throw th;
            }
        }
        int abs = Math.abs(ManagementFactory.getThreadMXBean().getThreadCount() - threadCount);
        Assert.assertTrue("Cancelling RDF parsing resulted in too many dangling threads (" + abs + ")", abs <= 5);
    }

    @Test
    public void lowLatencyParse() {
        RepeatingReadableByteChannel openInfiniteNtChannel = openInfiniteNtChannel();
        Stream limit = AsyncParser.of(Channels.newInputStream(openInfiniteNtChannel), Lang.TURTLE, (String) null).setChunkSize(100).streamTriples().limit(10L);
        try {
            Assert.assertEquals(10L, limit.count());
            if (limit != null) {
                limit.close();
            }
            long position = openInfiniteNtChannel.position();
            Assert.assertTrue("Too many bytes consumed from input stream (" + position + ")", position < 120000);
        } catch (Throwable th) {
            if (limit != null) {
                try {
                    limit.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNoEventsAreLost() {
        StringBuilder sb = new StringBuilder();
        sb.append("PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>");
        for (int i = 0; i < 4; i++) {
            sb.append("<urn:example:s> rdfs:label \"item # " + i + "\" .\n");
        }
        Stream streamElements = AsyncParser.of(new ByteArrayInputStream(sb.toString().getBytes()), Lang.TURTLE, (String) null).streamElements();
        try {
            List list = (List) streamElements.collect(Collectors.toList());
            if (streamElements != null) {
                streamElements.close();
            }
            Assert.assertEquals(list.size(), 5);
            String sb2 = sb.append("parse error").toString();
            for (int i2 = 1; i2 < 10; i2++) {
                streamElements = AsyncParser.of(new ByteArrayInputStream(sb2.getBytes()), Lang.TURTLE, (String) null).setChunkSize(i2).streamElements();
                try {
                    List list2 = (List) streamElements.collect(Collectors.toList());
                    Assert.assertEquals(5 + 1, list2.size());
                    Assert.assertEquals(list, list2.subList(0, 5));
                    Assert.assertTrue(((EltStreamRDF) list2.get(list2.size() - 1)).isException());
                    if (streamElements != null) {
                        streamElements.close();
                    }
                } finally {
                }
            }
        } finally {
        }
    }

    @Test
    public void testPrematureDispatch() {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        String str = "Forced abort";
        Stream limit = AsyncParser.of(openInfiniteNtStream(), Lang.TURTLE, (String) null).setPrematureDispatch(eltStreamRDF -> {
            if (atomicInteger.getAndIncrement() < 10) {
                return true;
            }
            throw new RuntimeException(str);
        }).streamElements().limit(1000L);
        try {
            List list = (List) limit.collect(Collectors.toList());
            if (limit != null) {
                limit.close();
            }
            Assert.assertEquals(10L, list.size());
            for (int i = 0; i < 10; i++) {
                boolean z = i + 1 == 10;
                EltStreamRDF eltStreamRDF2 = (EltStreamRDF) list.get(i);
                if (z) {
                    Assert.assertTrue("Last element expected to be an exception", eltStreamRDF2.isException());
                    Assert.assertEquals("Forced abort", eltStreamRDF2.exception().getMessage());
                } else {
                    Assert.assertTrue("Non-last element expected to be a triple", eltStreamRDF2.isTriple());
                }
            }
        } catch (Throwable th) {
            if (limit != null) {
                try {
                    limit.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static InputStream openInfiniteNtStream() {
        return Channels.newInputStream(openInfiniteNtChannel());
    }

    private static RepeatingReadableByteChannel openInfiniteNtChannel() {
        return new RepeatingReadableByteChannel("<urn:example:s> <urn:example:p> <urn:example:o> .\n".getBytes(StandardCharsets.UTF_8));
    }
}
