package org.apache.druid.data.input.impl.prefetch;

import com.amazonaws.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.io.CountingOutputStream;
import io.netty.util.SuppressForbidden;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.search.AutoStrategy;
import org.hamcrest.CoreMatchers;
import org.hyperic.sigar.win32.Pdh;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest.class */
public class PrefetchableTextFilesFirehoseFactoryTest {
    private static long FILE_SIZE = -1;
    private static final StringInputRowParser PARSER = new StringInputRowParser(new CSVParseSpec(new TimestampSpec(TimestampSpec.DEFAULT_COLUMN, AutoStrategy.NAME, null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TimestampSpec.DEFAULT_COLUMN, "a", "b"))), StringUtils.COMMA_SEPARATOR, Arrays.asList(TimestampSpec.DEFAULT_COLUMN, "a", "b"), false, 0), StandardCharsets.UTF_8.name());

    @ClassRule
    public static TemporaryFolder tempDir = new TemporaryFolder();
    private static File TEST_DIR;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest$TestPrefetchableTextFilesFirehoseFactory.class */
    static class TestPrefetchableTextFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<File> {
        private final long sleepMillis;
        private final File baseDir;
        private int numOpenExceptions;
        private int maxConnectionResets;
        private int readCount;
        private int numConnectionResets;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest$TestPrefetchableTextFilesFirehoseFactory$TestInputStream.class */
        public class TestInputStream extends InputStream {
            private static final int NUM_READ_COUNTS_BEFORE_ERROR = 10;
            private final InputStream delegate;
            private final int maxConnectionResets;

            TestInputStream(InputStream inputStream, int i) {
                this.delegate = inputStream;
                this.maxConnectionResets = i;
            }

            @Override // java.io.InputStream
            public int read() throws IOException {
                if (TestPrefetchableTextFilesFirehoseFactory.access$008(TestPrefetchableTextFilesFirehoseFactory.this) % 10 != 0 || TestPrefetchableTextFilesFirehoseFactory.access$108(TestPrefetchableTextFilesFirehoseFactory.this) >= this.maxConnectionResets) {
                    return this.delegate.read();
                }
                throw new SocketException("Test Connection reset");
            }

            @Override // java.io.InputStream
            public int read(byte[] bArr, int i, int i2) throws IOException {
                if (TestPrefetchableTextFilesFirehoseFactory.access$008(TestPrefetchableTextFilesFirehoseFactory.this) % 10 != 0 || TestPrefetchableTextFilesFirehoseFactory.access$108(TestPrefetchableTextFilesFirehoseFactory.this) >= this.maxConnectionResets) {
                    return this.delegate.read(bArr, i, i2);
                }
                throw new SocketException("Test Connection reset");
            }
        }

        static TestPrefetchableTextFilesFirehoseFactory with(File file, long j, long j2) {
            return new TestPrefetchableTextFilesFirehoseFactory(file, 1024L, j, j2, 60000L, 3, 0, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory of(File file) {
            return new TestPrefetchableTextFilesFirehoseFactory(file, 1024L, Pdh.PERF_TYPE_TEXT, Pdh.PERF_TYPE_TEXT, 3, 0, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withOpenExceptions(File file, int i) {
            return new TestPrefetchableTextFilesFirehoseFactory(file, 1024L, Pdh.PERF_TYPE_TEXT, Pdh.PERF_TYPE_TEXT, 3, i, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withConnectionResets(File file, long j, long j2, int i) {
            return new TestPrefetchableTextFilesFirehoseFactory(file, j2 / 2, j, j2, 3, 0, i, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withSleepMillis(File file, long j) {
            return new TestPrefetchableTextFilesFirehoseFactory(file, 1024L, Pdh.PERF_TYPE_TEXT, Pdh.PERF_TYPE_TEXT, 100L, 3, 0, 0, j);
        }

        private static long computeTimeout(int i) {
            return (long) Math.min(60000.0d, 1000.0d * Math.pow(2.0d, i - 1) * 2.0d);
        }

        TestPrefetchableTextFilesFirehoseFactory(File file, long j, long j2, long j3, int i, int i2, int i3, long j4) {
            this(file, j, j2, j3, computeTimeout(i), i, i2, i3, j4);
        }

        TestPrefetchableTextFilesFirehoseFactory(File file, long j, long j2, long j3, long j4, int i, int i2, int i3, long j5) {
            super(Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j), Long.valueOf(j4), Integer.valueOf(i));
            this.numOpenExceptions = i2;
            this.maxConnectionResets = i3;
            this.sleepMillis = j5;
            this.baseDir = file;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory
        public Collection<File> initObjects() {
            return FileUtils.listFiles(((File) Preconditions.checkNotNull(this.baseDir)).getAbsoluteFile(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory
        public InputStream openObjectStream(File file) throws IOException {
            if (this.numOpenExceptions > 0) {
                this.numOpenExceptions--;
                throw new IOException("Exception for retry test");
            }
            if (this.sleepMillis > 0) {
                try {
                    Thread.sleep(this.sleepMillis);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return this.maxConnectionResets > 0 ? new TestInputStream(FileUtils.openInputStream(file), this.maxConnectionResets) : FileUtils.openInputStream(file);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory
        public InputStream wrapObjectStream(File file, InputStream inputStream) {
            return inputStream;
        }

        @Override // org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory
        protected Predicate<Throwable> getRetryCondition() {
            return th -> {
                return th instanceof IOException;
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory
        public InputStream openObjectStream(File file, long j) throws IOException {
            if (this.numOpenExceptions > 0) {
                this.numOpenExceptions--;
                throw new IOException("Exception for retry test");
            }
            if (this.sleepMillis > 0) {
                try {
                    Thread.sleep(this.sleepMillis);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            FileInputStream openInputStream = FileUtils.openInputStream(file);
            openInputStream.skip(j);
            return this.maxConnectionResets > 0 ? new TestInputStream(openInputStream, this.maxConnectionResets) : openInputStream;
        }

        @Override // org.apache.druid.data.input.FiniteFirehoseFactory
        public FiniteFirehoseFactory<StringInputRowParser, File> withSplit(InputSplit<File> inputSplit) {
            throw new UnsupportedOperationException();
        }

        static /* synthetic */ int access$008(TestPrefetchableTextFilesFirehoseFactory testPrefetchableTextFilesFirehoseFactory) {
            int i = testPrefetchableTextFilesFirehoseFactory.readCount;
            testPrefetchableTextFilesFirehoseFactory.readCount = i + 1;
            return i;
        }

        static /* synthetic */ int access$108(TestPrefetchableTextFilesFirehoseFactory testPrefetchableTextFilesFirehoseFactory) {
            int i = testPrefetchableTextFilesFirehoseFactory.numConnectionResets;
            testPrefetchableTextFilesFirehoseFactory.numConnectionResets = i + 1;
            return i;
        }
    }

    @BeforeClass
    public static void setup() throws IOException {
        NullHandling.initializeForTests();
        TEST_DIR = tempDir.newFolder();
        for (int i = 0; i < 100; i++) {
            CountingOutputStream countingOutputStream = new CountingOutputStream(Files.newOutputStream(new File(TEST_DIR, "test_" + i).toPath(), new OpenOption[0]));
            Throwable th = null;
            try {
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(countingOutputStream, StandardCharsets.UTF_8));
                Throwable th2 = null;
                for (int i2 = 0; i2 < 100; i2++) {
                    try {
                        try {
                            bufferedWriter.write(org.apache.druid.java.util.common.StringUtils.format("%d,%03d,%03d\n", Integer.valueOf(20171220 + i), Integer.valueOf(i), Integer.valueOf(i2)));
                        } finally {
                        }
                    } finally {
                    }
                }
                bufferedWriter.flush();
                if (FILE_SIZE == -1) {
                    FILE_SIZE = countingOutputStream.getCount();
                } else {
                    Assert.assertEquals(FILE_SIZE, countingOutputStream.getCount());
                }
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                if (countingOutputStream != null) {
                    if (0 != 0) {
                        try {
                            countingOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        countingOutputStream.close();
                    }
                }
            } catch (Throwable th5) {
                if (countingOutputStream != null) {
                    if (0 != 0) {
                        try {
                            countingOutputStream.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        countingOutputStream.close();
                    }
                }
                throw th5;
            }
        }
    }

    private static void assertResult(List<Row> list) {
        Assert.assertEquals(QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE, list.size());
        list.sort((row, row2) -> {
            int compareTo = row.getTimestamp().compareTo((ReadableInstant) row2.getTimestamp());
            if (compareTo != 0) {
                return compareTo;
            }
            int compareTo2 = Integer.valueOf(row.getDimension("a").get(0)).compareTo(Integer.valueOf(row2.getDimension("a").get(0)));
            return compareTo2 != 0 ? compareTo2 : Integer.valueOf(row.getDimension("b").get(0)).compareTo(Integer.valueOf(row2.getDimension("b").get(0)));
        });
        for (int i = 0; i < 100; i++) {
            for (int i2 = 0; i2 < 100; i2++) {
                Assert.assertEquals(DateTimes.utc(20171220 + i), list.get((i * 100) + i2).getTimestamp());
                Assert.assertEquals(i, Integer.valueOf(r0.getDimension("a").get(0)).intValue());
                Assert.assertEquals(i2, Integer.valueOf(r0.getDimension("b").get(0)).intValue());
            }
        }
    }

    private static void assertNumRemainingCacheFiles(File file, int i) {
        Assert.assertNotNull(file.list());
        Assert.assertEquals(i, r0.length);
    }

    @SuppressForbidden(reason = "Files#createTempDirectory")
    private static File createFirehoseTmpDir(String str) throws IOException {
        return Files.createTempDirectory(tempDir.getRoot().toPath(), str, new FileAttribute[0]).toFile();
    }

    @Test
    public void testWithoutCacheAndFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 0L, 0L);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithoutCacheAndFetch");
        Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(0L, with.getCacheManager().getTotalCachedBytes());
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 0);
    }

    @Test
    public void testWithoutCacheAndFetchAgainstConnectionReset() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory withConnectionResets = TestPrefetchableTextFilesFirehoseFactory.withConnectionResets(TEST_DIR, 0L, 0L, 2);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithoutCacheAndFetch");
        Firehose connect = withConnectionResets.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(0L, withConnectionResets.getCacheManager().getTotalCachedBytes());
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 0);
    }

    @Test
    public void testWithoutCache() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 0L, Pdh.PERF_TYPE_TEXT);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithoutCache");
        Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(0L, with.getCacheManager().getTotalCachedBytes());
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 0);
    }

    @Test
    public void testWithZeroFetchCapacity() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, Pdh.PERF_TYPE_TEXT, 0L);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithZeroFetchCapacity");
        Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
    }

    @Test
    public void testWithCacheAndFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory of = TestPrefetchableTextFilesFirehoseFactory.of(TEST_DIR);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithCacheAndFetch");
        Firehose connect = of.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
    }

    @Test
    @Ignore("See issue #12638")
    public void testWithLargeCacheAndSmallFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, Pdh.PERF_TYPE_TEXT, 1024L);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithLargeCacheAndSmallFetch");
        Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
    }

    @Test
    public void testWithSmallCacheAndLargeFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 1024L, Pdh.PERF_TYPE_TEXT);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testWithSmallCacheAndLargeFetch");
        Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 1);
    }

    @Test
    public void testRetry() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory withOpenExceptions = TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(TEST_DIR, 1);
        ArrayList arrayList = new ArrayList();
        File createFirehoseTmpDir = createFirehoseTmpDir("testRetry");
        Firehose connect = withOpenExceptions.connect(PARSER, createFirehoseTmpDir);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    arrayList.add(connect.nextRow());
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        assertResult(arrayList);
        assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
    }

    @Test
    public void testMaxRetry() throws IOException {
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class));
        this.expectedException.expectMessage("Exception for retry test");
        Firehose connect = TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(TEST_DIR, 5).connect(PARSER, createFirehoseTmpDir("testMaxRetry"));
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    connect.nextRow();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th3;
            }
        }
        if (connect != null) {
            if (0 == 0) {
                connect.close();
                return;
            }
            try {
                connect.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testTimeout() throws IOException {
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
        Firehose connect = TestPrefetchableTextFilesFirehoseFactory.withSleepMillis(TEST_DIR, 1000L).connect(PARSER, createFirehoseTmpDir("testTimeout"));
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    connect.nextRow();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th3;
            }
        }
        if (connect != null) {
            if (0 == 0) {
                connect.close();
                return;
            }
            try {
                connect.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    @Ignore("See issue #12638")
    public void testReconnectWithCacheAndPrefetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory of = TestPrefetchableTextFilesFirehoseFactory.of(TEST_DIR);
        File createFirehoseTmpDir = createFirehoseTmpDir("testReconnectWithCacheAndPrefetch");
        for (int i = 0; i < 5; i++) {
            ArrayList arrayList = new ArrayList();
            Firehose connect = of.connect(PARSER, createFirehoseTmpDir);
            Throwable th = null;
            if (i > 0) {
                try {
                    try {
                        Assert.assertEquals(FILE_SIZE * 2, of.getCacheManager().getTotalCachedBytes());
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            }
            while (connect.hasMore()) {
                arrayList.add(connect.nextRow());
            }
            if (connect != null) {
                if (0 != 0) {
                    try {
                        connect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connect.close();
                }
            }
            assertResult(arrayList);
            assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
        }
    }

    @Test
    public void testReconnectWithCache() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory with = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, Pdh.PERF_TYPE_TEXT, 0L);
        File createFirehoseTmpDir = createFirehoseTmpDir("testReconnectWithCache");
        for (int i = 0; i < 5; i++) {
            ArrayList arrayList = new ArrayList();
            Firehose connect = with.connect(PARSER, createFirehoseTmpDir);
            Throwable th = null;
            if (i > 0) {
                try {
                    try {
                        Assert.assertEquals(FILE_SIZE * 2, with.getCacheManager().getTotalCachedBytes());
                    } catch (Throwable th2) {
                        if (connect != null) {
                            if (th != null) {
                                try {
                                    connect.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                connect.close();
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            }
            while (connect.hasMore()) {
                arrayList.add(connect.nextRow());
            }
            if (connect != null) {
                if (0 != 0) {
                    try {
                        connect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connect.close();
                }
            }
            assertResult(arrayList);
            assertNumRemainingCacheFiles(createFirehoseTmpDir, 2);
        }
    }
}
