/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.clickhouse;

import java.sql.SQLException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.clickhouse.BaseClickHouseTest;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class AtomicInsertTest
extends BaseClickHouseTest {
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    private static final int MIN_ATTEMPTS = 2;
    private static final int MAX_ATTEMPTS = 20;

    private static boolean shouldAttempt(int i, long count) {
        return i < 2 || count == 0L && i < 20;
    }

    @Test
    public void testAtomicInsert() throws SQLException {
        int size = 1000000;
        int done = 0;
        this.executeSql("CREATE TABLE test_atomic_insert (  f0 Int64,   f1 Int64 MATERIALIZED CAST(if((rand() % " + size + ") = 0, '', '1') AS Int64)) ENGINE=MergeTree ORDER BY (f0)");
        ((PCollection)this.pipeline.apply((PTransform)RangeBundle.of(size))).apply((PTransform)ClickHouseIO.write((String)clickHouse.getJdbcUrl(), (String)"test_atomic_insert").withMaxInsertBlockSize((long)size).withInitialBackoff(Duration.millis((long)1L)).withMaxRetries(2));
        long count = 0L;
        int i = 0;
        while (AtomicInsertTest.shouldAttempt(i, count)) {
            done += this.safeRun() ? 1 : 0;
            count = this.executeQueryAsLong("SELECT COUNT(*) FROM test_atomic_insert");
            ++i;
        }
        Assert.assertEquals((long)((long)done * (long)size), (long)count);
        Assert.assertTrue((String)"insert didn't succeed after 20 attempts", (count > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testIdempotentInsert() throws SQLException {
        int size = 1000000;
        this.executeSql("CREATE TABLE test_idempotent_insert (  f0 Int64,   f1 Int64 MATERIALIZED CAST(if((rand() % " + size + ") = 0, '', '1') AS Int64)) ENGINE=ReplicatedMergeTree('/clickHouse/tables/0/test_idempotent_insert', 'replica_0') ORDER BY (f0)");
        ((PCollection)this.pipeline.apply((PTransform)RangeBundle.of(size))).apply((PTransform)ClickHouseIO.write((String)clickHouse.getJdbcUrl(), (String)"test_idempotent_insert").withMaxInsertBlockSize((long)size).withInitialBackoff(Duration.millis((long)1L)).withMaxRetries(2));
        long count = 0L;
        int i = 0;
        while (AtomicInsertTest.shouldAttempt(i, count)) {
            this.safeRun();
            count = this.executeQueryAsLong("SELECT COUNT(*) FROM test_idempotent_insert");
            ++i;
        }
        Assert.assertEquals((long)size, (long)count);
        Assert.assertTrue((String)"insert didn't succeed after 20 attempts", (count > 0L ? 1 : 0) != 0);
    }

    private boolean safeRun() {
        try {
            return this.pipeline.run().waitUntilFinish() == PipelineResult.State.DONE;
        }
        catch (Pipeline.PipelineExecutionException e) {
            return false;
        }
    }

    private static class RangeBundle
    extends PTransform<PBegin, PCollection<Row>> {
        private final int size;

        private RangeBundle(int size) {
            this.size = size;
        }

        static RangeBundle of(int size) {
            return new RangeBundle(size);
        }

        public PCollection<Row> expand(PBegin input) {
            Schema schema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"f0", (Schema.FieldType)Schema.FieldType.INT64)});
            Iterable bundle = IntStream.range(0, this.size).mapToObj(x -> Row.withSchema((Schema)schema).addValue((Object)x).build()).collect(Collectors.toList());
            return ((PCollection)((PCollection)input.getPipeline().apply((PTransform)Create.of((Object)bundle, (Object[])new Iterable[0]).withCoder((Coder)IterableCoder.of((Coder)RowCoder.of((Schema)schema))))).apply((PTransform)Flatten.iterables())).setRowSchema(schema);
        }
    }
}

