package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/spark/source/TestStructuredStreamingRead3.class */
public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
    private Table table;
    private final AtomicInteger microBatches;
    private static final List<List<SimpleRecord>> TEST_DATA_MULTIPLE_SNAPSHOTS = Lists.newArrayList(new List[]{Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")})});
    private static final List<List<List<SimpleRecord>>> TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS = Lists.newArrayList(new List[]{Lists.newArrayList(new List[]{Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")})}), Lists.newArrayList(new List[]{Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(8, "eight"), new SimpleRecord(9, "nine")})}), Lists.newArrayList(new List[]{Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(10, "ten"), new SimpleRecord(11, "eleven"), new SimpleRecord(12, "twelve")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(13, "thirteen"), new SimpleRecord(14, "fourteen")}), Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(15, "fifteen"), new SimpleRecord(16, "sixteen")})})});
    private static final String MEMORY_TABLE = "_stream_view_mem";

    public TestStructuredStreamingRead3(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
        this.microBatches = new AtomicInteger();
    }

    @Before
    public void setupTable() {
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (bucket(3, id))", this.tableName);
        this.table = this.validationCatalog.loadTable(this.tableIdent);
        this.microBatches.set(0);
    }

    @After
    public void stopStreams() throws TimeoutException {
        for (StreamingQuery streamingQuery : spark.streams().active()) {
            streamingQuery.stop();
        }
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        Assertions.assertThat(rowsAvailable(startStream())).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() throws Exception {
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assert.assertEquals(6L, microBatchCount(ImmutableMap.of("streaming-max-files-per-micro-batch", "1")));
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() throws Exception {
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assert.assertEquals(3L, microBatchCount(ImmutableMap.of("streaming-max-files-per-micro-batch", "2")));
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() throws Exception {
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assert.assertEquals(1L, microBatchCount(ImmutableMap.of("streaming-max-rows-per-micro-batch", "1")));
        Assertions.assertThat(rowsAvailable(startStream("streaming-max-rows-per-micro-batch", "1"))).containsExactlyInAnyOrderElementsOf(Lists.newArrayList(new SimpleRecord[]{TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)}));
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() throws Exception {
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assert.assertEquals(2L, microBatchCount(ImmutableMap.of("streaming-max-rows-per-micro-batch", "4")));
    }

    @Test
    public void testReadStreamOnIcebergThenAddData() throws Exception {
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        StreamingQuery startStream = startStream();
        appendDataAsMultipleSnapshots(list);
        Assertions.assertThat(rowsAvailable(startStream)).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadingStreamFromTimestamp() throws Exception {
        appendData(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")}));
        this.table.refresh();
        StreamingQuery startStream = startStream("stream-from-timestamp", Long.toString(this.table.currentSnapshot().timestampMillis() + 1));
        Assertions.assertThat(rowsAvailable(startStream).isEmpty()).isTrue();
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        Assertions.assertThat(rowsAvailable(startStream)).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadingStreamFromFutureTimetsamp() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        StreamingQuery startStream = startStream("stream-from-timestamp", Long.toString(currentTimeMillis));
        Assertions.assertThat(rowsAvailable(startStream).isEmpty()).isTrue();
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")});
        IntStream.range(0, 3).forEach(i -> {
            appendData(newArrayList);
            Assertions.assertThat(rowsAvailable(startStream).isEmpty()).isTrue();
        });
        waitUntilAfter(currentTimeMillis);
        appendData(newArrayList);
        Assertions.assertThat(rowsAvailable(startStream)).containsExactlyInAnyOrderElementsOf(newArrayList);
    }

    @Test
    public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
        appendData(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}));
        long currentTimeMillis = System.currentTimeMillis() + 2000;
        StreamingQuery startStream = startStream("stream-from-timestamp", Long.toString(currentTimeMillis));
        Assert.assertEquals(Collections.emptyList(), rowsAvailable(startStream));
        waitUntilAfter(currentTimeMillis);
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        Assertions.assertThat(rowsAvailable(startStream)).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendData(list.get(0));
        this.table.refresh();
        StreamingQuery startStream = startStream("stream-from-timestamp", Long.toString(this.table.currentSnapshot().timestampMillis()));
        for (int i = 1; i < list.size(); i++) {
            appendData(list.get(i));
        }
        Assertions.assertThat(rowsAvailable(startStream)).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutException {
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "two")});
        ArrayList newArrayList3 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(3, "three")});
        ArrayList newArrayList4 = Lists.newArrayList();
        newArrayList4.addAll(newArrayList2);
        newArrayList4.addAll(newArrayList3);
        appendData(newArrayList);
        this.table.refresh();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long timestampMillis = this.table.currentSnapshot().timestampMillis();
        appendData(newArrayList2);
        appendData(newArrayList3);
        this.table.expireSnapshots().expireSnapshotId(snapshotId).commit();
        Assertions.assertThat(rowsAvailable(startStream("stream-from-timestamp", String.valueOf(timestampMillis)))).containsExactlyInAnyOrderElementsOf(newArrayList4);
    }

    @Test
    public void testResumingStreamReadFromCheckpoint() throws Exception {
        File file = new File(this.temp.newFolder("writer-checkpoint-folder"), "writer-checkpoint");
        File newFolder = this.temp.newFolder();
        DataStreamWriter option = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", file.toString()).format("parquet").queryName("checkpoint_test").option("path", newFolder.getPath());
        StreamingQuery start = option.start();
        start.processAllAvailable();
        start.stop();
        ArrayList newArrayList = Lists.newArrayList();
        for (List<List<SimpleRecord>> list : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
            appendDataAsMultipleSnapshots(list);
            newArrayList.addAll(Lists.newArrayList(Iterables.concat(new Iterable[]{Iterables.concat(list)})));
            StreamingQuery start2 = option.start();
            start2.processAllAvailable();
            start2.stop();
            Assertions.assertThat(spark.read().load(newFolder.getPath()).as(Encoders.bean(SimpleRecord.class)).collectAsList()).containsExactlyInAnyOrderElementsOf(Iterables.concat(new Iterable[]{newArrayList}));
        }
    }

    @Test
    public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
        DataStreamWriter option = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", new File(this.temp.newFolder("writer-checkpoint-folder"), "writer-checkpoint").toString()).format("parquet").queryName("checkpoint_test").option("path", this.temp.newFolder().getPath());
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(2, "two")});
        StreamingQuery start = option.start();
        appendData(newArrayList);
        this.table.refresh();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        start.processAllAvailable();
        start.stop();
        appendData(newArrayList2);
        this.table.expireSnapshots().expireSnapshotId(snapshotId).commit();
        StreamingQuery start2 = option.start();
        Objects.requireNonNull(start2);
        AssertionsForClassTypes.assertThatThrownBy(start2::processAllAvailable).hasCauseInstanceOf(IllegalStateException.class).hasMessageContaining(String.format("Cannot load current offset at snapshot %d, the snapshot was expired or removed", Long.valueOf(snapshotId)));
    }

    @Test
    public void testParquetOrcAvroDataInOneTable() throws Exception {
        ArrayList newArrayList = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")});
        ArrayList newArrayList2 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")});
        ArrayList newArrayList3 = Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")});
        appendData(newArrayList);
        appendData(newArrayList2, "orc");
        appendData(newArrayList3, "avro");
        Assertions.assertThat(rowsAvailable(startStream())).containsExactlyInAnyOrderElementsOf(Iterables.concat(newArrayList, newArrayList2, newArrayList3));
    }

    @Test
    public void testReadStreamFromEmptyTable() throws Exception {
        Assert.assertEquals(Collections.emptyList(), rowsAvailable(startStream()));
    }

    @Test
    public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
        TableOperations operations = this.table.operations();
        TableMetadata current = operations.current();
        operations.commit(current, current.upgradeToFormatVersion(2));
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Schema select = this.table.schema().select(new String[]{"data"});
        this.table.newRowDelta().addDeletes(FileHelpers.writeDeleteFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{GenericRecord.create(select).copy("data", "one")}), select)).commit();
        Assert.assertEquals("overwrite", this.table.currentSnapshot().operation());
        StreamingQuery startStream = startStream();
        AssertHelpers.assertThrowsCause("Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, "Cannot process overwrite snapshot", () -> {
            startStream.processAllAvailable();
        });
    }

    @Test
    public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        this.table.rewriteManifests().clusterBy(dataFile -> {
            return 1;
        }).commit();
        Assert.assertEquals("replace", this.table.currentSnapshot().operation());
        Assertions.assertThat(rowsAvailable(startStream())).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField(Expressions.ref("id")).commit();
        appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        this.table.newDelete().deleteFromRowFilter(Expressions.equal("id", 4)).commit();
        Assert.assertEquals("delete", this.table.currentSnapshot().operation());
        StreamingQuery startStream = startStream();
        AssertHelpers.assertThrowsCause("Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, "Cannot process delete snapshot", () -> {
            startStream.processAllAvailable();
        });
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField(Expressions.ref("id")).commit();
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        this.table.newDelete().deleteFromRowFilter(Expressions.equal("id", 4)).commit();
        Assert.assertEquals("delete", this.table.currentSnapshot().operation());
        Assertions.assertThat(rowsAvailable(startStream("streaming-skip-delete-snapshots", "true"))).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField(Expressions.ref("id")).commit();
        List<List<SimpleRecord>> list = TEST_DATA_MULTIPLE_SNAPSHOTS;
        appendDataAsMultipleSnapshots(list);
        this.table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 4)).commit();
        Assert.assertEquals("overwrite", this.table.currentSnapshot().operation());
        Assertions.assertThat(rowsAvailable(startStream("streaming-skip-overwrite-snapshots", "true"))).containsExactlyInAnyOrderElementsOf(Iterables.concat(list));
    }

    private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> list) {
        Iterator<List<SimpleRecord>> it = list.iterator();
        while (it.hasNext()) {
            appendData(it.next());
        }
    }

    private void appendData(List<SimpleRecord> list) {
        appendData(list, "parquet");
    }

    private void appendData(List<SimpleRecord> list, String str) {
        spark.createDataFrame(list, SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", str).mode("append").save(this.tableName);
    }

    private StreamingQuery startStream(Map<String, String> map) throws TimeoutException {
        return spark.readStream().options(map).format("iceberg").load(this.tableName).writeStream().options(map).format("memory").queryName(MEMORY_TABLE).outputMode(OutputMode.Append()).start();
    }

    private StreamingQuery startStream() throws TimeoutException {
        return startStream(Collections.emptyMap());
    }

    private StreamingQuery startStream(String str, String str2) throws TimeoutException {
        return startStream(ImmutableMap.of(str, str2, "streaming-max-files-per-micro-batch", "1"));
    }

    private int microBatchCount(Map<String, String> map) throws TimeoutException {
        spark.readStream().options(map).format("iceberg").load(this.tableName).writeStream().options(map).foreachBatch((dataset, l) -> {
            this.microBatches.getAndIncrement();
        }).start().processAllAvailable();
        stopStreams();
        return this.microBatches.get();
    }

    private List<SimpleRecord> rowsAvailable(StreamingQuery streamingQuery) {
        streamingQuery.processAllAvailable();
        return spark.sql("select * from _stream_view_mem").as(Encoders.bean(SimpleRecord.class)).collectAsList();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 751518385:
                if (implMethodName.equals("lambda$microBatchCount$136aab3a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/iceberg/spark/source/TestStructuredStreamingRead3") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Dataset;Ljava/lang/Long;)V")) {
                    TestStructuredStreamingRead3 testStructuredStreamingRead3 = (TestStructuredStreamingRead3) serializedLambda.getCapturedArg(0);
                    return (dataset, l) -> {
                        this.microBatches.getAndIncrement();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
