package org.apache.beam.sdk.io.clickhouse;

import java.sql.SQLException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.class */
public class AtomicInsertTest extends BaseClickHouseTest {

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    private static final int MIN_ATTEMPTS = 2;
    private static final int MAX_ATTEMPTS = 20;

    /* loaded from: input_file:org/apache/beam/sdk/io/clickhouse/AtomicInsertTest$RangeBundle.class */
    private static class RangeBundle extends PTransform<PBegin, PCollection<Row>> {
        private final int size;

        private RangeBundle(int i) {
            this.size = i;
        }

        static RangeBundle of(int i) {
            return new RangeBundle(i);
        }

        public PCollection<Row> expand(PBegin pBegin) {
            Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("f0", Schema.FieldType.INT64)});
            return pBegin.getPipeline().apply(Create.of((Iterable) IntStream.range(0, this.size).mapToObj(i -> {
                return Row.withSchema(of).addValue(Long.valueOf(i)).build();
            }).collect(Collectors.toList()), new Iterable[0]).withCoder(IterableCoder.of(RowCoder.of(of)))).apply(Flatten.iterables()).setRowSchema(of);
        }
    }

    private static boolean shouldAttempt(int i, long j) {
        return i < MIN_ATTEMPTS || (j == 0 && i < 20);
    }

    @Test
    public void testAtomicInsert() throws SQLException {
        int i = 0;
        executeSql("CREATE TABLE test_atomic_insert (  f0 Int64,   f1 Int64 MATERIALIZED CAST(if((rand() % 1000000) = 0, '', '1') AS Int64)) ENGINE=MergeTree ORDER BY (f0)");
        this.pipeline.apply(RangeBundle.of(1000000)).apply(ClickHouseIO.write(clickHouse.getJdbcUrl(), "test_atomic_insert").withMaxInsertBlockSize(1000000).withInitialBackoff(Duration.millis(1L)).withMaxRetries(MIN_ATTEMPTS));
        long j = 0;
        for (int i2 = 0; shouldAttempt(i2, j); i2++) {
            i += safeRun() ? 1 : 0;
            j = executeQueryAsLong("SELECT COUNT(*) FROM test_atomic_insert");
        }
        Assert.assertEquals(i * 1000000, j);
        Assert.assertTrue("insert didn't succeed after 20 attempts", j > 0);
    }

    @Test
    public void testIdempotentInsert() throws SQLException {
        executeSql("CREATE TABLE test_idempotent_insert (  f0 Int64,   f1 Int64 MATERIALIZED CAST(if((rand() % 1000000) = 0, '', '1') AS Int64)) ENGINE=ReplicatedMergeTree('/clickHouse/tables/0/test_idempotent_insert', 'replica_0') ORDER BY (f0)");
        this.pipeline.apply(RangeBundle.of(1000000)).apply(ClickHouseIO.write(clickHouse.getJdbcUrl(), "test_idempotent_insert").withMaxInsertBlockSize(1000000).withInitialBackoff(Duration.millis(1L)).withMaxRetries(MIN_ATTEMPTS));
        long j = 0;
        for (int i = 0; shouldAttempt(i, j); i++) {
            safeRun();
            j = executeQueryAsLong("SELECT COUNT(*) FROM test_idempotent_insert");
        }
        Assert.assertEquals(1000000, j);
        Assert.assertTrue("insert didn't succeed after 20 attempts", j > 0);
    }

    private boolean safeRun() {
        try {
            return this.pipeline.run().waitUntilFinish() == PipelineResult.State.DONE;
        } catch (Pipeline.PipelineExecutionException e) {
            return false;
        }
    }
}
