package org.apache.beam.runners.core.construction;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.java.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.core.construction.java.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.core.construction.java.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.class */
public class UnboundedReadFromBoundedSourceTest {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Rule
    public TestPipeline p = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest$UnsplittableSource.class */
    private static class UnsplittableSource extends FileBasedSource<Byte> {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest$UnsplittableSource$UnsplittableReader.class */
        public static class UnsplittableReader extends FileBasedSource.FileBasedReader<Byte> {
            ByteBuffer buff;
            Byte current;
            long offset;
            ReadableByteChannel channel;

            public UnsplittableReader(UnsplittableSource unsplittableSource) {
                super(unsplittableSource);
                this.buff = ByteBuffer.allocate(1);
                this.offset = unsplittableSource.getStartOffset() - 1;
            }

            /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
            public Byte m16getCurrent() throws NoSuchElementException {
                return this.current;
            }

            public boolean allowsDynamicSplitting() {
                return false;
            }

            protected boolean isAtSplitPoint() {
                return true;
            }

            protected void startReading(ReadableByteChannel readableByteChannel) throws IOException {
                this.channel = readableByteChannel;
            }

            protected boolean readNextRecord() throws IOException {
                this.buff.clear();
                if (this.channel.read(this.buff) != 1) {
                    return false;
                }
                this.current = Byte.valueOf(this.buff.get(0));
                this.offset++;
                return true;
            }

            protected long getCurrentOffset() {
                return this.offset;
            }
        }

        public UnsplittableSource(String str, long j) {
            super(ValueProvider.StaticValueProvider.of(str), j);
        }

        public UnsplittableSource(MatchResult.Metadata metadata, long j, long j2, long j3) {
            super(metadata, j, j2, j3);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: createForSubrangeOfFile, reason: merged with bridge method [inline-methods] */
        public UnsplittableSource m15createForSubrangeOfFile(MatchResult.Metadata metadata, long j, long j2) {
            return new UnsplittableSource(metadata, getMinBundleSize(), j, j2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: createSingleFileReader, reason: merged with bridge method [inline-methods] */
        public UnsplittableReader m14createSingleFileReader(PipelineOptions pipelineOptions) {
            return new UnsplittableReader(this);
        }

        public Coder<Byte> getDefaultOutputCoder() {
            return SerializableCoder.of(Byte.class);
        }
    }

    @Test
    public void testCheckpointCoderNulls() throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder checkpointCoder = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder(StringUtf8Coder.of());
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpoint = (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint) CoderUtils.decodeFromByteArray(checkpointCoder, CoderUtils.encodeToByteArray(checkpointCoder, new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint((List) null, (BoundedSource) null)));
        Assert.assertNull(checkpoint.getResidualElements());
        Assert.assertNull(checkpoint.getResidualSource());
    }

    @Test
    public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
        CoderProperties.coderSerializable(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder(GlobalWindow.Coder.INSTANCE));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testBoundedToUnboundedSourceAdapter() throws Exception {
        PCollection apply = this.p.apply(Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo(100L))).withMaxNumRecords(100L));
        PAssert.thatSingleton(apply.apply("Count", Count.globally())).isEqualTo(100L);
        PAssert.thatSingleton(apply.apply(Distinct.create()).apply("UniqueCount", Count.globally())).isEqualTo(100L);
        PAssert.thatSingleton(apply.apply("Min", Min.globally())).isEqualTo(0L);
        PAssert.thatSingleton(apply.apply("Max", Max.globally())).isEqualTo(Long.valueOf(100 - 1));
        this.p.run();
    }

    @Test
    public void testCountingSourceToUnboundedCheckpoint() throws Exception {
        BoundedSource upTo = CountingSource.upTo(100L);
        ArrayList newArrayList = Lists.newArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                testBoundedToUnboundedSourceAdapterCheckpoint(upTo, newArrayList);
                return;
            } else {
                newArrayList.add(Long.valueOf(j2));
                j = j2 + 1;
            }
        }
    }

    @Test
    public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
        File newFile = this.tmpFolder.newFile("test-input.gz");
        byte[] generateInput = generateInput(100);
        writeFile(newFile, generateInput);
        UnsplittableSource unsplittableSource = new UnsplittableSource(newFile.getPath(), 1L);
        ArrayList newArrayList = Lists.newArrayList();
        for (byte b : generateInput) {
            newArrayList.add(Byte.valueOf(b));
        }
        testBoundedToUnboundedSourceAdapterCheckpoint(unsplittableSource, newArrayList);
    }

    private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(BoundedSource<T> boundedSource, List<T> list) throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Reader createReader = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(boundedSource).createReader(PipelineOptionsFactory.create(), (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint) null);
        ArrayList newArrayList = Lists.newArrayList();
        for (boolean start = createReader.start(); start; start = createReader.advance()) {
            newArrayList.add(createReader.getCurrent());
            if (newArrayList.size() % 9 == 0) {
                createReader.getCheckpointMark().finalizeCheckpoint();
            }
        }
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpointMark = createReader.getCheckpointMark();
        Assert.assertTrue(checkpointMark.getResidualElements() == null || checkpointMark.getResidualElements().isEmpty());
        Assert.assertEquals(list.size(), newArrayList.size());
        Assert.assertEquals(Sets.newHashSet(list), Sets.newHashSet(newArrayList));
    }

    @Test
    public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
        BoundedSource upTo = CountingSource.upTo(100L);
        ArrayList newArrayList = Lists.newArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                testBoundedToUnboundedSourceAdapterCheckpointRestart(upTo, newArrayList);
                return;
            } else {
                newArrayList.add(Long.valueOf(j2));
                j = j2 + 1;
            }
        }
    }

    @Test
    public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
        File newFile = this.tmpFolder.newFile("test-input.gz");
        byte[] generateInput = generateInput(1000);
        writeFile(newFile, generateInput);
        UnsplittableSource unsplittableSource = new UnsplittableSource(newFile.getPath(), 1L);
        ArrayList newArrayList = Lists.newArrayList();
        for (byte b : generateInput) {
            newArrayList.add(Byte.valueOf(b));
        }
        testBoundedToUnboundedSourceAdapterCheckpointRestart(unsplittableSource, newArrayList);
    }

    private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(BoundedSource<T> boundedSource, List<T> list) throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter boundedToUnboundedSourceAdapter = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(boundedSource);
        PipelineOptions create = PipelineOptionsFactory.create();
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Reader createReader = boundedToUnboundedSourceAdapter.createReader(create, (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint) null);
        ArrayList newArrayList = Lists.newArrayList();
        boolean start = createReader.start();
        while (start) {
            newArrayList.add(createReader.getCurrent());
            if (newArrayList.size() % 9 == 0) {
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpointMark = createReader.getCheckpointMark();
                Coder checkpointMarkCoder = boundedToUnboundedSourceAdapter.getCheckpointMarkCoder();
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpoint = (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint) CoderUtils.decodeFromByteArray(checkpointMarkCoder, CoderUtils.encodeToByteArray(checkpointMarkCoder, checkpointMark));
                createReader.close();
                checkpointMark.finalizeCheckpoint();
                createReader = boundedToUnboundedSourceAdapter.createReader(create, checkpoint);
                start = createReader.start();
            } else {
                start = createReader.advance();
            }
        }
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpointMark2 = createReader.getCheckpointMark();
        Assert.assertTrue(checkpointMark2.getResidualElements() == null || checkpointMark2.getResidualElements().isEmpty());
        Assert.assertEquals(list.size(), newArrayList.size());
        Assert.assertEquals(Sets.newHashSet(list), Sets.newHashSet(newArrayList));
    }

    @Test
    public void testReadBeforeStart() throws Exception {
        this.thrown.expect(NoSuchElementException.class);
        new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo(100L)).createReader(PipelineOptionsFactory.create(), (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint) null).getCurrent();
    }

    @Test
    public void testReadFromCheckpointBeforeStart() throws Exception {
        this.thrown.expect(NoSuchElementException.class);
        BoundedSource upTo = CountingSource.upTo(100L);
        new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(upTo).createReader(PipelineOptionsFactory.create(), new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint(ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))), upTo)).getCurrent();
    }

    private static byte[] generateInput(int i) {
        byte[] bArr = new byte[i];
        new Random(285930L).nextBytes(bArr);
        return bArr;
    }

    private static void writeFile(File file, byte[] bArr) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        Throwable th = null;
        try {
            try {
                fileOutputStream.write(bArr);
                if (fileOutputStream != null) {
                    if (0 == 0) {
                        fileOutputStream.close();
                        return;
                    }
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th4;
        }
    }
}
