/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;
import io.grpc.MethodDescriptor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
import org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.FlightTestUtil;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SyncPutListener;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

public class TestBasicOperation {
    @Test
    public void fastPathDefaults() {
        Assertions.assertTrue((boolean)ArrowMessage.ENABLE_ZERO_COPY_READ);
        Assertions.assertFalse((boolean)ArrowMessage.ENABLE_ZERO_COPY_WRITE);
    }

    @Test
    public void fallbackLocation() {
        Assertions.assertEquals((Object)"arrow-flight-reuse-connection://?", (Object)Location.reuseConnection().getUri().toString());
    }

    @Test
    public void unknownScheme() throws URISyntaxException {
        Location location = new Location("s3://unknown");
        Assertions.assertEquals((Object)"s3", (Object)location.getUri().getScheme());
    }

    @Test
    public void unknownSchemeRemote() throws Exception {
        this.test((FlightClient c) -> {
            try {
                FlightInfo info = c.getInfo(FlightDescriptor.path((String[])new String[]{"test"}), new CallOption[0]);
                Assertions.assertEquals((Object)new URI("https://example.com"), (Object)((Location)((FlightEndpoint)info.getEndpoints().get(0)).getLocations().get(0)).getUri());
            }
            catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void roundTripTicket() throws Exception {
        Ticket ticket = new Ticket(new byte[]{0, 1, 2, 3, 4, 5});
        Assertions.assertEquals((Object)ticket, (Object)Ticket.deserialize((ByteBuffer)ticket.serialize()));
    }

    @Test
    public void roundTripInfo() throws Exception {
        HashMap<String, String> metadata = new HashMap<String, String>();
        metadata.put("foo", "bar");
        Schema schema = new Schema(Arrays.asList(Field.nullable((String)"a", (ArrowType)new ArrowType.Int(32, true)), Field.nullable((String)"b", (ArrowType)new ArrowType.FixedSizeBinary(32))), metadata);
        FlightInfo info1 = FlightInfo.builder((Schema)schema, (FlightDescriptor)FlightDescriptor.path((String[])new String[0]), Collections.emptyList()).setAppMetadata("foo".getBytes(StandardCharsets.UTF_8)).build();
        FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command((byte[])new byte[2]), Collections.singletonList(FlightEndpoint.builder((Ticket)new Ticket(new byte[10]), (Location[])new Location[]{Location.forGrpcDomainSocket((String)"/tmp/test.sock")}).setAppMetadata("bar".getBytes(StandardCharsets.UTF_8)).build()), 200L, 500L);
        FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path((String[])new String[]{"a", "b"}), Arrays.asList(new FlightEndpoint(new Ticket(new byte[10]), new Location[]{Location.forGrpcDomainSocket((String)"/tmp/test.sock")}), new FlightEndpoint(new Ticket(new byte[10]), new Location[]{Location.forGrpcDomainSocket((String)"/tmp/test.sock"), Location.forGrpcInsecure((String)"localhost", (int)50051)})), 200L, 500L);
        FlightInfo info4 = new FlightInfo(schema, FlightDescriptor.path((String[])new String[]{"a", "b"}), Arrays.asList(new FlightEndpoint(new Ticket(new byte[10]), new Location[]{Location.forGrpcDomainSocket((String)"/tmp/test.sock")}), new FlightEndpoint(new Ticket(new byte[10]), new Location[]{Location.forGrpcDomainSocket((String)"/tmp/test.sock"), Location.forGrpcInsecure((String)"localhost", (int)50051)})), 200L, 500L, true, IpcOption.DEFAULT);
        Assertions.assertEquals((Object)info1, (Object)FlightInfo.deserialize((ByteBuffer)info1.serialize()));
        Assertions.assertEquals((Object)info2, (Object)FlightInfo.deserialize((ByteBuffer)info2.serialize()));
        Assertions.assertEquals((Object)info3, (Object)FlightInfo.deserialize((ByteBuffer)info3.serialize()));
        Assertions.assertEquals((Object)info4, (Object)FlightInfo.deserialize((ByteBuffer)info4.serialize()));
        Assertions.assertNotEquals((Object)info3, (Object)info4);
        Assertions.assertFalse((boolean)info1.getOrdered());
        Assertions.assertFalse((boolean)info2.getOrdered());
        Assertions.assertFalse((boolean)info3.getOrdered());
        Assertions.assertTrue((boolean)info4.getOrdered());
    }

    @Test
    public void roundTripDescriptor() throws Exception {
        FlightDescriptor cmd = FlightDescriptor.command((byte[])"test command".getBytes(StandardCharsets.UTF_8));
        Assertions.assertEquals((Object)cmd, (Object)FlightDescriptor.deserialize((ByteBuffer)cmd.serialize()));
        FlightDescriptor path = FlightDescriptor.path((String[])new String[]{"foo", "bar", "test.arrow"});
        Assertions.assertEquals((Object)path, (Object)FlightDescriptor.deserialize((ByteBuffer)path.serialize()));
    }

    @Test
    public void getDescriptors() throws Exception {
        this.test((FlightClient c) -> {
            int count = 0;
            for (FlightInfo unused : c.listFlights(Criteria.ALL, new CallOption[0])) {
                ++count;
            }
            Assertions.assertEquals((int)1, (int)count);
        });
    }

    @Test
    public void getDescriptorsWithCriteria() throws Exception {
        this.test((FlightClient c) -> {
            int count = 0;
            for (FlightInfo unused : c.listFlights(new Criteria(new byte[]{1}), new CallOption[0])) {
                ++count;
            }
            Assertions.assertEquals((int)0, (int)count);
        });
    }

    @Test
    public void getDescriptor() throws Exception {
        this.test((FlightClient c) -> System.out.println(c.getInfo(FlightDescriptor.path((String[])new String[]{"hello"}), new CallOption[0]).getDescriptor()));
    }

    @Test
    public void getSchema() throws Exception {
        this.test((FlightClient c) -> System.out.println(c.getSchema(FlightDescriptor.path((String[])new String[]{"hello"}), new CallOption[0]).getSchema()));
    }

    @Test
    public void listActions() throws Exception {
        this.test((FlightClient c) -> {
            for (ActionType at : c.listActions(new CallOption[0])) {
                System.out.println(at.getType());
            }
        });
    }

    @Test
    public void doAction() throws Exception {
        this.test((FlightClient c) -> {
            Iterator stream = c.doAction(new Action("hello"), new CallOption[0]);
            Assertions.assertTrue((boolean)stream.hasNext());
            Result r = (Result)stream.next();
            Assertions.assertArrayEquals((byte[])"world".getBytes(Charsets.UTF_8), (byte[])r.getBody());
        });
        this.test((FlightClient c) -> {
            Iterator stream = c.doAction(new Action("hellooo"), new CallOption[0]);
            Assertions.assertTrue((boolean)stream.hasNext());
            Result r = (Result)stream.next();
            Assertions.assertArrayEquals((byte[])"world".getBytes(Charsets.UTF_8), (byte[])r.getBody());
            Assertions.assertTrue((boolean)stream.hasNext());
            r = (Result)stream.next();
            Assertions.assertArrayEquals((byte[])"!".getBytes(Charsets.UTF_8), (byte[])r.getBody());
            Assertions.assertFalse((boolean)stream.hasNext());
        });
    }

    @Test
    public void putStream() throws Exception {
        this.test((FlightClient c, BufferAllocator a) -> {
            int size = 10;
            IntVector iv = new IntVector("c1", a);
            try (VectorSchemaRoot root = VectorSchemaRoot.of((FieldVector[])new FieldVector[]{iv});){
                int i;
                FlightClient.ClientStreamListener listener = c.startPut(FlightDescriptor.path((String[])new String[]{"hello"}), root, (FlightClient.PutListener)new AsyncPutListener(), new CallOption[0]);
                root.allocateNew();
                for (i = 0; i < 10; ++i) {
                    iv.set(i, i);
                }
                iv.setValueCount(10);
                root.setRowCount(10);
                listener.putNext();
                root.allocateNew();
                for (i = 0; i < 10; ++i) {
                    iv.set(i, i + 10);
                }
                iv.setValueCount(10);
                root.setRowCount(10);
                listener.putNext();
                root.clear();
                listener.completed();
                listener.getResult();
            }
        });
    }

    @Test
    public void propagateErrors() throws Exception {
        this.test((FlightClient client) -> FlightTestUtil.assertCode(FlightStatusCode.UNIMPLEMENTED, () -> client.doAction(new Action("invalid-action"), new CallOption[0]).forEachRemaining(action -> Assertions.fail())));
    }

    @Test
    public void getStream() throws Exception {
        this.test((FlightClient c) -> {
            try (FlightStream stream = c.getStream(new Ticket(new byte[0]), new CallOption[0]);){
                VectorSchemaRoot root = stream.getRoot();
                IntVector iv = (IntVector)root.getVector("c1");
                int value = 0;
                while (stream.next()) {
                    for (int i = 0; i < root.getRowCount(); ++i) {
                        Assertions.assertEquals((int)value, (int)iv.get(i));
                        ++value;
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    @DisabledOnOs(value={OS.WINDOWS}, disabledReason="https://github.com/apache/arrow/issues/33237: flaky test")
    public void getStreamLargeBatch() throws Exception {
        this.test((FlightClient c) -> {
            try (FlightStream stream = c.getStream(new Ticket(Producer.TICKET_LARGE_BATCH), new CallOption[0]);){
                Assertions.assertEquals((int)128, (int)stream.getRoot().getFieldVectors().size());
                Assertions.assertTrue((boolean)stream.next());
                Assertions.assertEquals((int)65536, (int)stream.getRoot().getRowCount());
                Assertions.assertTrue((boolean)stream.next());
                Assertions.assertEquals((int)65536, (int)stream.getRoot().getRowCount());
                Assertions.assertFalse((boolean)stream.next());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void startPutLargeBatch() throws Exception {
        try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);){
            ArrayList<BigIntVector> vectors = new ArrayList<BigIntVector>();
            for (int col = 0; col < 128; ++col) {
                BigIntVector vector = new BigIntVector("f" + col, (BufferAllocator)allocator);
                for (int row = 0; row < 65536; ++row) {
                    vector.setSafe(row, (long)row);
                }
                vectors.add(vector);
            }
            this.test((FlightClient c) -> {
                try (VectorSchemaRoot root = new VectorSchemaRoot((Iterable)vectors);){
                    root.setRowCount(65536);
                    FlightClient.ClientStreamListener stream = c.startPut(FlightDescriptor.path((String[])new String[]{""}), root, (FlightClient.PutListener)new SyncPutListener(), new CallOption[0]);
                    stream.putNext();
                    stream.putNext();
                    stream.completed();
                    stream.getResult();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    private void test(Consumer<FlightClient> consumer) throws Exception {
        this.test((FlightClient c, BufferAllocator a) -> consumer.accept((FlightClient)c));
    }

    private void test(BiConsumer<FlightClient, BufferAllocator> consumer) throws Exception {
        try (RootAllocator a = new RootAllocator(Long.MAX_VALUE);
             Producer producer = new Producer((BufferAllocator)a);
             FlightServer s = FlightServer.builder((BufferAllocator)a, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)producer).build().start();
             FlightClient c = FlightClient.builder((BufferAllocator)a, (Location)s.getLocation()).build();
             BufferAllocator testAllocator = a.newChildAllocator("testcase", 0L, Long.MAX_VALUE);){
            consumer.accept(c, testAllocator);
        }
    }

    private Flight.FlightData arrowMessageToProtobuf(MethodDescriptor.Marshaller<ArrowMessage> marshaller, ArrowMessage message) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (InputStream serialized = marshaller.stream((Object)message);){
            int read;
            byte[] buf = new byte[1024];
            while ((read = serialized.read(buf)) >= 0) {
                baos.write(buf, 0, read);
            }
        }
        byte[] serializedMessage = baos.toByteArray();
        return Flight.FlightData.parseFrom((byte[])serializedMessage);
    }

    @Test
    public void testProtobufRecordBatchCompatibility() throws Exception {
        Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"foo", (ArrowType)new ArrowType.Int(32, true))));
        try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
             VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);){
            VectorUnloader unloader = new VectorUnloader(root);
            root.setRowCount(0);
            MethodDescriptor.Marshaller marshaller = ArrowMessage.createMarshaller((BufferAllocator)allocator);
            try (ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, false, IpcOption.DEFAULT);){
                Assertions.assertEquals((Object)ArrowMessage.HeaderType.RECORD_BATCH, (Object)message.getMessageType());
                Iterator iterator = message.getBufs().iterator();
                Assertions.assertTrue((boolean)iterator.hasNext());
                while (iterator.hasNext()) {
                    Assertions.assertEquals((long)0L, (long)((ArrowBuf)iterator.next()).capacity());
                }
                Flight.FlightData protobufData = this.arrowMessageToProtobuf((MethodDescriptor.Marshaller<ArrowMessage>)marshaller, message).toBuilder().clearDataBody().build();
                Assertions.assertEquals((int)0, (int)protobufData.getDataBody().size());
                ArrowMessage parsedMessage = (ArrowMessage)marshaller.parse((InputStream)new ByteArrayInputStream(protobufData.toByteArray()));
                Iterator parsedIterator = parsedMessage.getBufs().iterator();
                Assertions.assertTrue((boolean)parsedIterator.hasNext());
                Assertions.assertEquals((long)0L, (long)((ArrowBuf)parsedIterator.next()).capacity());
                Assertions.assertFalse((boolean)parsedIterator.hasNext());
                ArrowRecordBatch rb = parsedMessage.asRecordBatch();
                Assertions.assertEquals((long)rb.computeBodyLength(), (long)0L);
            }
        }
    }

    @Test
    public void testProtobufSchemaCompatibility() throws Exception {
        Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"foo", (ArrowType)new ArrowType.Int(32, true))));
        try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);){
            MethodDescriptor.Marshaller marshaller = ArrowMessage.createMarshaller((BufferAllocator)allocator);
            Flight.FlightDescriptor descriptor = FlightDescriptor.command((byte[])new byte[0]).toProtocol();
            try (ArrowMessage message = new ArrowMessage(descriptor, schema, IpcOption.DEFAULT);){
                Assertions.assertEquals((Object)ArrowMessage.HeaderType.SCHEMA, (Object)message.getMessageType());
                Assertions.assertFalse((boolean)message.getBufs().iterator().hasNext());
                Flight.FlightData protobufData = this.arrowMessageToProtobuf((MethodDescriptor.Marshaller<ArrowMessage>)marshaller, message).toBuilder().setDataBody(ByteString.EMPTY).build();
                Assertions.assertEquals((int)0, (int)protobufData.getDataBody().size());
                ArrowMessage parsedMessage = (ArrowMessage)marshaller.parse((InputStream)new ByteArrayInputStream(protobufData.toByteArray()));
                Assertions.assertFalse((boolean)parsedMessage.getBufs().iterator().hasNext());
                parsedMessage.asSchema();
            }
        }
    }

    @Test
    public void testGrpcInsecureLocation() throws Exception {
        Location location = Location.forGrpcInsecure((String)"localhost", (int)9000);
        Assertions.assertEquals((Object)new URI("grpc+tcp", null, "localhost", 9000, null, null, null), (Object)location.getUri());
        Assertions.assertEquals((Object)new InetSocketAddress("localhost", 9000), (Object)location.toSocketAddress());
    }

    @Test
    public void testGrpcTlsLocation() throws Exception {
        Location location = Location.forGrpcTls((String)"localhost", (int)9000);
        Assertions.assertEquals((Object)new URI("grpc+tls", null, "localhost", 9000, null, null, null), (Object)location.getUri());
        Assertions.assertEquals((Object)new InetSocketAddress("localhost", 9000), (Object)location.toSocketAddress());
    }

    public static class Producer
    implements FlightProducer,
    AutoCloseable {
        static final byte[] TICKET_LARGE_BATCH = "large-batch".getBytes(StandardCharsets.UTF_8);
        private final BufferAllocator allocator;

        public Producer(BufferAllocator allocator) {
            this.allocator = allocator;
        }

        public void listFlights(FlightProducer.CallContext context, Criteria criteria, FlightProducer.StreamListener<FlightInfo> listener) {
            if (criteria.getExpression().length > 0) {
                listener.onCompleted();
            }
            Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder().setFlightDescriptor(Flight.FlightDescriptor.newBuilder().setType(Flight.FlightDescriptor.DescriptorType.CMD).setCmd(ByteString.copyFrom((String)"cool thing", (Charset)Charsets.UTF_8))).build();
            try {
                listener.onNext((Object)new FlightInfo(getInfo));
            }
            catch (URISyntaxException e) {
                listener.onError((Throwable)e);
                return;
            }
            listener.onCompleted();
        }

        public Runnable acceptPut(FlightProducer.CallContext context, FlightStream flightStream, FlightProducer.StreamListener<PutResult> ackStream) {
            return () -> {
                while (flightStream.next()) {
                }
            };
        }

        public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
            int i;
            if (Arrays.equals(TICKET_LARGE_BATCH, ticket.getBytes())) {
                this.getLargeBatch(listener);
                return;
            }
            int size = 10;
            IntVector iv = new IntVector("c1", this.allocator);
            VectorSchemaRoot root = VectorSchemaRoot.of((FieldVector[])new FieldVector[]{iv});
            listener.start(root);
            root.allocateNew();
            for (i = 0; i < 10; ++i) {
                iv.set(i, i);
            }
            iv.setValueCount(10);
            root.setRowCount(10);
            listener.putNext();
            root.allocateNew();
            for (i = 0; i < 10; ++i) {
                iv.set(i, i + 10);
            }
            iv.setValueCount(10);
            root.setRowCount(10);
            listener.putNext();
            root.clear();
            listener.completed();
        }

        private void getLargeBatch(FlightProducer.ServerStreamListener listener) {
            ArrayList<BigIntVector> vectors = new ArrayList<BigIntVector>();
            for (int col = 0; col < 128; ++col) {
                BigIntVector vector = new BigIntVector("f" + col, this.allocator);
                for (int row = 0; row < 65536; ++row) {
                    vector.setSafe(row, (long)row);
                }
                vectors.add(vector);
            }
            try (VectorSchemaRoot root = new VectorSchemaRoot(vectors);){
                root.setRowCount(65536);
                listener.start(root);
                listener.putNext();
                listener.putNext();
                listener.completed();
            }
        }

        @Override
        public void close() throws Exception {
            this.allocator.close();
        }

        public FlightInfo getFlightInfo(FlightProducer.CallContext context, FlightDescriptor descriptor) {
            try {
                Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder().setSchema(Producer.schemaToByteString(new Schema(Collections.emptyList()))).setFlightDescriptor(Flight.FlightDescriptor.newBuilder().setType(Flight.FlightDescriptor.DescriptorType.CMD).setCmd(ByteString.copyFrom((String)"cool thing", (Charset)Charsets.UTF_8))).addEndpoint(Flight.FlightEndpoint.newBuilder().addLocation(new Location("https://example.com").toProtocol())).build();
                return new FlightInfo(getInfo);
            }
            catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }

        private static ByteString schemaToByteString(Schema schema) {
            ByteString byteString;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                MessageSerializer.serialize((WriteChannel)new WriteChannel(Channels.newChannel(baos)), (Schema)schema, (IpcOption)IpcOption.DEFAULT);
                byteString = ByteString.copyFrom((byte[])baos.toByteArray());
            }
            catch (Throwable throwable) {
                try {
                    try {
                        baos.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            baos.close();
            return byteString;
        }

        public void doAction(FlightProducer.CallContext context, Action action, FlightProducer.StreamListener<Result> listener) {
            switch (action.getType()) {
                case "hello": {
                    listener.onNext((Object)new Result("world".getBytes(Charsets.UTF_8)));
                    listener.onCompleted();
                    break;
                }
                case "hellooo": {
                    listener.onNext((Object)new Result("world".getBytes(Charsets.UTF_8)));
                    listener.onNext((Object)new Result("!".getBytes(Charsets.UTF_8)));
                    listener.onCompleted();
                    break;
                }
                default: {
                    listener.onError((Throwable)CallStatus.UNIMPLEMENTED.withDescription("Action not implemented: " + action.getType()).toRuntimeException());
                }
            }
        }

        public void listActions(FlightProducer.CallContext context, FlightProducer.StreamListener<ActionType> listener) {
            listener.onNext((Object)new ActionType("get", ""));
            listener.onNext((Object)new ActionType("put", ""));
            listener.onNext((Object)new ActionType("hello", ""));
            listener.onCompleted();
        }
    }
}

