package org.apache.beam.sdk.io.kudu;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.kudu.KuduIO;
import org.apache.beam.sdk.io.kudu.KuduService;
import org.apache.beam.sdk.io.kudu.KuduTestUtils;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.RowResult;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduIOTest.class */
public class KuduIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(KuduIOTest.class);
    private KuduService<Integer> mockReadService;
    private KuduService<String> mockWriteService;

    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public final TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public final transient ExpectedLogs expectedWriteLogs = ExpectedLogs.none(FakeWriter.class);

    @Rule
    public final transient ExpectedLogs expectedReadLogs = ExpectedLogs.none(FakeReader.class);
    private final int numberRecords = 10;
    private int targetParallelism = 3;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduIOTest$FakeReader.class */
    public static class FakeReader extends BoundedSource.BoundedReader<Integer> {
        private static final Logger LOG = LoggerFactory.getLogger(FakeReader.class);
        static final String LOG_SET_RANGE = "FakeReader serializedToken gives range %d - %d";
        private final KuduIO.KuduSource<Integer> source;
        private int lowerInclusive = 0;
        private int upperExclusive = 100;
        private int current = 0;
        private RowResult mockRecord = (RowResult) Mockito.mock(RowResult.class);

        FakeReader(KuduIO.KuduSource<Integer> kuduSource) {
            this.source = kuduSource;
            Mockito.when(Integer.valueOf(this.mockRecord.getInt((String) Matchers.any()))).thenAnswer(invocationOnMock -> {
                return Integer.valueOf(this.current);
            });
        }

        public boolean start() {
            if (this.source.serializedToken != null) {
                ByteBuffer wrap = ByteBuffer.wrap(this.source.serializedToken);
                this.lowerInclusive = wrap.getInt();
                this.upperExclusive = wrap.getInt();
                LOG.debug(String.format(LOG_SET_RANGE, Integer.valueOf(this.lowerInclusive), Integer.valueOf(this.upperExclusive)));
            }
            this.current = this.lowerInclusive;
            return true;
        }

        public boolean advance() {
            this.current++;
            return this.current < this.upperExclusive;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public Integer m4getCurrent() {
            return (Integer) this.source.spec.getParseFn().apply(this.mockRecord);
        }

        public void close() {
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public BoundedSource<Integer> m3getCurrentSource() {
            return this.source;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduIOTest$FakeReaderAnswer.class */
    static class FakeReaderAnswer implements Answer<FakeReader>, Serializable {
        FakeReaderAnswer() {
        }

        /* renamed from: answer, reason: merged with bridge method [inline-methods] */
        public FakeReader m5answer(InvocationOnMock invocationOnMock) {
            return new FakeReader((KuduIO.KuduSource) invocationOnMock.getArguments()[0]);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduIOTest$FakeWriter.class */
    private static class FakeWriter implements KuduService.Writer<Long> {
        static final String LOG_OPEN_SESSION = "FakeWriter[%d] openSession";
        static final String LOG_WRITE = "FakeWriter[%d] write";
        static final String LOG_WRITE_VALUE = "FakeWriter value[%d]";
        static final String LOG_CLOSE_SESSION = "FakeWriter[%d] closeSession";
        private transient int id;
        private static final Logger LOG = LoggerFactory.getLogger(FakeWriter.class);
        private static final AtomicInteger counter = new AtomicInteger(0);

        private FakeWriter() {
            this.id = 0;
        }

        public void openSession() {
            LOG.debug(String.format(LOG_OPEN_SESSION, Integer.valueOf(this.id)));
        }

        public void write(Long l) {
            LOG.debug(String.format(LOG_WRITE, l));
            LOG.debug(String.format(LOG_WRITE_VALUE, l));
        }

        public void closeSession() {
            LOG.debug(String.format(LOG_CLOSE_SESSION, Integer.valueOf(this.id)));
        }

        public void close() {
            LOG.debug("FakeWriter[{}] close {}", Integer.valueOf(this.id));
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            objectInputStream.defaultReadObject();
            this.id = counter.incrementAndGet();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.mockReadService = (KuduService) Mockito.mock(KuduService.class, Mockito.withSettings().serializable());
        this.mockWriteService = (KuduService) Mockito.mock(KuduService.class, Mockito.withSettings().serializable());
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [byte[], java.lang.Object[]] */
    @Test
    public void testRead() throws KuduException {
        Mockito.when(this.mockReadService.createReader((KuduIO.KuduSource) Matchers.any())).thenAnswer(new FakeReaderAnswer());
        Mockito.when(this.mockReadService.createTabletScanners((KuduIO.Read) Matchers.any())).thenReturn(Arrays.asList(new byte[]{ByteBuffer.allocate(8).putInt(0).putInt(25).array(), ByteBuffer.allocate(8).putInt(25).putInt(50).array(), ByteBuffer.allocate(8).putInt(50).putInt(75).array(), ByteBuffer.allocate(8).putInt(75).putInt(100).array()}));
        PAssert.thatSingleton(this.readPipeline.apply(KuduIO.read().withMasterAddresses("mock").withTable("Table").withParseFn(rowResult -> {
            return Integer.valueOf(rowResult.getInt("id"));
        }).withKuduService(this.mockReadService).withCoder(BigEndianIntegerCoder.of())).apply("Count", Count.globally())).isEqualTo(100L);
        this.readPipeline.run().waitUntilFinish();
        this.expectedReadLogs.verifyDebug(String.format("FakeReader serializedToken gives range %d - %d", 0, 25));
        this.expectedReadLogs.verifyDebug(String.format("FakeReader serializedToken gives range %d - %d", 25, 50));
        this.expectedReadLogs.verifyDebug(String.format("FakeReader serializedToken gives range %d - %d", 50, 75));
        this.expectedReadLogs.verifyDebug(String.format("FakeReader serializedToken gives range %d - %d", 75, 100));
    }

    @Test
    @Ignore
    public void testWrite() throws Exception {
        Mockito.when(this.mockWriteService.createWriter((KuduIO.Write) Matchers.any())).thenReturn(new FakeWriter());
        this.writePipeline.apply("Generate sequence", GenerateSequence.from(0L).to(10L)).apply("Write records to Kudu", KuduIO.write().withMasterAddresses("ignored").withTable("ignored").withFormatFn(new KuduTestUtils.GenerateUpsert()).withKuduService(this.mockWriteService));
        this.writePipeline.run().waitUntilFinish();
        for (int i = 1; i <= this.targetParallelism; i++) {
            this.expectedWriteLogs.verifyDebug(String.format("FakeWriter[%d] openSession", Integer.valueOf(i)));
            this.expectedWriteLogs.verifyDebug(String.format("FakeWriter[%d] write", Integer.valueOf(i)));
            this.expectedWriteLogs.verifyDebug(String.format("FakeWriter[%d] closeSession", Integer.valueOf(i)));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            this.expectedWriteLogs.verifyDebug(String.format("FakeWriter value[%d]", Integer.valueOf(i2)));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1276051008:
                if (implMethodName.equals("lambda$testRead$2f9baff8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kudu/KuduIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kudu/client/RowResult;)Ljava/lang/Integer;")) {
                    return rowResult -> {
                        return Integer.valueOf(rowResult.getInt("id"));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
