package org.apache.flink.connector.file.table;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.file.src.util.Utils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/file/table/LimitableBulkFormatTest.class */
public class LimitableBulkFormatTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private File file;

    /* loaded from: input_file:org/apache/flink/connector/file/table/LimitableBulkFormatTest$FailedFormat.class */
    private static class FailedFormat extends SimpleStreamFormat<String> {
        private FailedFormat() {
        }

        /* renamed from: createReader, reason: merged with bridge method [inline-methods] */
        public FailedReader m29createReader(Configuration configuration, FSDataInputStream fSDataInputStream) throws IOException {
            return new FailedReader();
        }

        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/table/LimitableBulkFormatTest$FailedReader.class */
    public static final class FailedReader implements StreamFormat.Reader<String> {
        private FailedReader() {
        }

        @Nullable
        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public String m30read() throws IOException {
            throw new RuntimeException();
        }

        public void close() throws IOException {
        }
    }

    @Before
    public void prepare() throws IOException {
        this.file = TEMP_FOLDER.newFile();
        this.file.createNewFile();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 10000; i++) {
            sb.append(i).append("\n");
        }
        FileUtils.writeFileUtf8(this.file, sb.toString());
    }

    @Test
    public void test() throws IOException {
        BulkFormat.Reader createReader = LimitableBulkFormat.create(new StreamFormatAdapter(new TextLineInputFormat()), 22L).createReader(new Configuration(), new FileSourceSplit("id", new Path(this.file.toURI()), 0L, this.file.length(), this.file.lastModified(), this.file.length()));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Utils.forEachRemaining(createReader, str -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(22L, atomicInteger.get());
    }

    @Test
    public void testLimitOverBatches() throws IOException {
        Long l = 2048L;
        Configuration configuration = new Configuration();
        configuration.set(StreamFormat.FETCH_IO_SIZE, MemorySize.parse("4k"));
        BulkFormat.Reader createReader = LimitableBulkFormat.create(new StreamFormatAdapter(new TextLineInputFormat()), l).createReader(configuration, new FileSourceSplit("id", new Path(this.file.toURI()), 0L, this.file.length(), this.file.lastModified(), this.file.length()));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Utils.forEachRemaining(createReader, str -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(l.intValue(), atomicInteger.get());
    }

    @Test
    public void testSwallowExceptionWhenLimited() throws IOException {
        LimitableBulkFormat create = LimitableBulkFormat.create(new StreamFormatAdapter(new FailedFormat()), 1000L);
        BulkFormat.Reader createReader = create.createReader(new Configuration(), new FileSourceSplit("id", new Path(this.file.toURI()), 0L, this.file.length()));
        create.globalNumberRead().set(1000 + 1);
        createReader.readBatch();
    }
}
