/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;

public class SubscriptionSessionExample {
    private static Session session;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 6667;
    private static final String TOPIC_1 = "topic1";
    private static final String TOPIC_2 = "`'topic2'`";
    private static final String TOPIC_3 = "`\"topic3\"`";
    private static final String TOPIC_4 = "`\"top \\.i.c4\"`";
    private static final long SLEEP_NS = 1000000000L;
    private static final long POLL_TIMEOUT_MS = 10000L;
    private static final int MAX_RETRY_TIMES = 3;
    private static final int PARALLELISM = 8;
    private static final long CURRENT_TIME;

    private static void prepareData() throws Exception {
        session = new Session.Builder().host(HOST).port(6667).username("root").password("root").version(Version.V_1_0).build();
        session.open(false);
        for (int i = 0; i < 100; ++i) {
            session.executeNonQueryStatement(String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 2)", i));
            session.executeNonQueryStatement(String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 4)", CURRENT_TIME + (long)i));
            session.executeNonQueryStatement(String.format("insert into root.sg.d3(time, s1) values (%s, 5)", CURRENT_TIME + (long)(2 * i)));
        }
        session.executeNonQueryStatement("flush");
        session.close();
        session = null;
    }

    private static void dataQuery() throws Exception {
        session = new Session.Builder().host(HOST).port(6667).username("root").password("root").version(Version.V_1_0).build();
        session.open(false);
        SessionDataSet dataSet = session.executeQueryStatement("select ** from root.**");
        while (dataSet.hasNext()) {
            System.out.println(dataSet.next());
        }
        session.close();
        session = null;
    }

    private static void dataSubscription1() throws Exception {
        Properties config;
        try (SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, 6667);){
            subscriptionSession.open();
            config = new Properties();
            config.put("path", "root.db.d1.s1");
            config.put("start-time", (Object)25);
            config.put("end-time", (Object)75);
            subscriptionSession.createTopic(TOPIC_1, config);
        }
        int retryCount = 0;
        config = new Properties();
        config.put("consumer-id", "c1");
        config.put("group-id", "cg1");
        try (SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config);){
            List messages;
            consumer1.open();
            consumer1.subscribe(TOPIC_1);
            while (!(messages = consumer1.poll(10000L)).isEmpty() || ++retryCount < 3) {
                for (SubscriptionMessage message : messages) {
                    for (SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                        System.out.println(dataSet.getColumnNames());
                        System.out.println(dataSet.getColumnTypes());
                        while (dataSet.hasNext()) {
                            System.out.println(dataSet.next());
                        }
                    }
                }
            }
            try (SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, 6667);){
                subscriptionSession.open();
                subscriptionSession.getTopics().forEach(System.out::println);
                subscriptionSession.getSubscriptions().forEach(System.out::println);
            }
            consumer1.unsubscribe(TOPIC_1);
        }
    }

    private static void dataSubscription2() throws Exception {
        try (SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, 6667);){
            subscriptionSession.open();
            Properties config = new Properties();
            config.put("start-time", (Object)(CURRENT_TIME + 33L));
            config.put("end-time", (Object)(CURRENT_TIME + 66L));
            config.put("format", "TsFileHandler");
            subscriptionSession.createTopic(TOPIC_2, config);
        }
        ArrayList<Thread> threads = new ArrayList<Thread>();
        int i = 0;
        while (i < 8) {
            int idx = i++;
            Thread thread = new Thread(() -> {
                int retryCount = 0;
                try (SubscriptionPullConsumer consumer2 = new SubscriptionPullConsumer.Builder().consumerId("c" + idx).consumerGroupId("cg2").autoCommit(false).buildPullConsumer();){
                    List messages;
                    consumer2.open();
                    consumer2.subscribe(TOPIC_2);
                    while (!(messages = consumer2.poll(Collections.singleton(TOPIC_2), 10000L)).isEmpty() || ++retryCount < 3) {
                        for (SubscriptionMessage message : messages) {
                            TsFileReader reader = message.getTsFileHandler().openReader();
                            try {
                                QueryDataSet dataSet = reader.query(QueryExpression.create(Arrays.asList(new Path("root.db.d2", "s2", true), new Path("root.sg.d3", "s1", true)), null));
                                while (dataSet.hasNext()) {
                                    System.out.println(dataSet.next());
                                }
                            }
                            finally {
                                if (reader == null) continue;
                                reader.close();
                            }
                        }
                        consumer2.commitSync((Iterable)messages);
                    }
                    consumer2.unsubscribe(TOPIC_2);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            thread.start();
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.join();
        }
    }

    private static void dataSubscription3() throws Exception {
        try (SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, 6667);){
            subscriptionSession.open();
            Properties config = new Properties();
            config.put("format", "TsFileHandler");
            config.put("mode", "snapshot");
            subscriptionSession.createTopic(TOPIC_3, config);
        }
        ArrayList<Thread> threads = new ArrayList<Thread>();
        int i = 0;
        while (i < 8) {
            int idx = i++;
            Thread thread = new Thread(() -> {
                try (SubscriptionPushConsumer consumer3 = new SubscriptionPushConsumer.Builder().consumerId("c" + idx).consumerGroupId("cg3").ackStrategy(AckStrategy.AFTER_CONSUME).consumeListener(message -> {
                    System.out.println(message.getTsFileHandler().getFile().getAbsolutePath());
                    return ConsumeResult.SUCCESS;
                }).buildPushConsumer();){
                    consumer3.open();
                    consumer3.subscribe(TOPIC_3);
                    while (!consumer3.allTopicMessagesHaveBeenConsumed()) {
                        LockSupport.parkNanos(1000000000L);
                    }
                }
            });
            thread.start();
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.join();
        }
    }

    private static void dataSubscription4() throws Exception {
        try (SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, 6667);){
            subscriptionSession.open();
            Properties config = new Properties();
            config.put("format", "TsFileHandler");
            config.put("mode", "snapshot");
            subscriptionSession.createTopic(TOPIC_4, config);
        }
        AtomicLong counter = new AtomicLong();
        ArrayList<Thread> threads = new ArrayList<Thread>();
        int i = 0;
        while (i < 8) {
            int idx = i++;
            Thread thread = new Thread(() -> {
                try (SubscriptionPullConsumer consumer4 = new SubscriptionPullConsumer.Builder().consumerId("c" + idx).consumerGroupId("cg4").autoCommit(true).fileSaveFsync(true).buildPullConsumer();){
                    consumer4.open();
                    consumer4.subscribe(TOPIC_4);
                    while (!consumer4.allTopicMessagesHaveBeenConsumed()) {
                        for (SubscriptionMessage message : consumer4.poll(10000L)) {
                            SubscriptionTsFileHandler handler = message.getTsFileHandler();
                            handler.moveFile(Paths.get(System.getProperty("user.dir"), "exported-tsfiles").resolve(URLEncoder.encode(TOPIC_4) + "-" + counter.getAndIncrement() + ".tsfile"));
                        }
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            thread.start();
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.join();
        }
    }

    public static void main(String[] args) throws Exception {
        SubscriptionSessionExample.prepareData();
        SubscriptionSessionExample.dataSubscription4();
    }

    static {
        CURRENT_TIME = System.currentTimeMillis();
    }
}

