使用cassandra数据库查询作为Flink程序的源 [英] Using a cassandra database query as the source for a Flink program

查看:75
本文介绍了使用cassandra数据库查询作为Flink程序的源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Cassandra数据库,该数据库必须从我的Flink程序中像socket一样从套接字接收数据以进行Streamprocessing.因此,我编写了一个简单的客户端程序,该程序从Cassandra读取数据并将其发送到套接字;而且,我在服务器库中编写了Flink程序.实际上,我的客户端程序很简单,不使用任何Flink指令;它只是以字符串格式将Cassandra行发送到套接字,服务器必须接收该行. 首先,我运行Flink程序以侦听客户端,然后运行客户端程序.客户端从服务器接收到此流(因为服务器发送了数据流数据并且客户端无法正确接收它):

I have a Cassandra database that have to receive its data in my Flink program from socket like steam for Streamprocessing. So, I wrote a simple client program that read data from Cassandra and sent the data to the socket;also,I wrote the Flink program in server base.In fact, my client program is simple and does not use any Flink instructions;it just send a Cassandra row in string format to socket and Server must receive the row. First, I run the Flink program to listen to the client and then run the client program. The client received this stream from server (because server send datastream data and client cannot receive it correctly):

Hi Client org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

Hi Client org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

此后,两个程序均保持运行状态,而没有发送和接收任何数据,并且没有错误.

After that both programs stay running without sending and receiving any data and there is no error.

以下是Flink程序: 公共类WordCount_in_cassandra {

The Flink program is in following: public class WordCount_in_cassandra {

 private static int myport=9999;
 private static String hostname="localhost";
 //static ServerSocket variable
 private static ServerSocket server;
 private static int count_row=0;

 public static void main(String[] args) throws Exception {
 // Checking input parameters
 final ParameterTool params = ParameterTool.fromArgs(args);
 // set up the execution environment
 final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();

 //create the socket server object
    server = new ServerSocket(myport);
 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    while (true){
        System.out.println("Waiting for client request");
        //creating socket and waiting for client connection
        Socket socket = server.accept();
        DataStream<String> stream = env.socketTextStream(hostname, 
        myport);

        stream.print();

        //write object to Socket
        oos.writeObject("Hi Client " + stream.toString());
        oos.close();
        socket.close();

        // parse the data, group it, window it, and aggregate the 
        counts
    DataStream<Tuple2<String, Long>> counts = stream
                .flatMap(new FlatMapFunction<String, Tuple2<String, 
    Long>>() {
                    @Override
            public void flatMap(String value, 
     Collector<Tuple2<String, Long>> out) {
                        // normalize and split the line
           String[] words = value.toLowerCase().split("\\W+");

                        // emit the pairs
             for (String word : words) {

                if (!word.isEmpty()) {
                   out.collect(new Tuple2<String, Long>(word, 1L));
                            }
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use -- 
            output to specify output path.");

            counts.print();
        }

        //terminate the server if client sends exit request
        if (stream.equals("exit")){
            System.out.println("row_count : "+count_row);
            break;
        }

        // execute program
        env.execute("Streaming WordCount");
    }//while true
    System.out.println("Shutting down Socket server!!");
    server.close();
     }//main
   }

客户端程序是这样的:

public class client_code {
private static Cluster cluster = 
  Cluster.builder().addContactPoint("127.0.0.1")
 .withPort(9042).build();
private static Session session = cluster.connect("mar1");

 public static void main(String[] args) throws UnknownHostException, 
   IOException, ClassNotFoundException, InterruptedException {
    String serverIP = "localhost";
    int port=9999;
    Socket socket = null;
    ObjectOutputStream oos = null;
    ObjectInputStream ois = null;

    ResultSet result = session.execute("select * from tlbtest15");
    for (Row row : result) {
        //establish socket connection to server
        socket = new Socket(serverIP, port);
        //write to socket using ObjectOutputStream
        oos = new ObjectOutputStream(socket.getOutputStream());
        System.out.println("Sending request to Socket Server");

        if (row==result) oos.writeObject("exit");
        else oos.writeObject(""+row+"");
        //read the server response message
        ois = new ObjectInputStream(socket.getInputStream());
        String message = (String) ois.readObject();
        System.out.println("Message: " + message);
        //close resources
        ois.close();
        oos.close();
        Thread.sleep(100);
    }

    cluster.close();
 }
}

请告诉我如何解决我的问题?

Would you please tell me how I can solve my problem?

任何帮助将不胜感激.

Any help would be appreciated.

推荐答案

您尝试构建Flink应用程序的方式存在一些问题.一些评论:

There are several problems with the way you've tried to construct the Flink application. A few comments:

  • Flink DataStream API用于描述调用env.execute()时发送到集群以执行的数据流图.将其包装在while(true)循环中没有任何意义.
  • socketTextStream建立客户端连接.您的服务器似乎没有任何用处.
  • stream.equals("exit")-stream是一个DataStream,而不是String.如果要在流元素具有特定值时执行一些特殊的操作,则需要使用一次执行事件处理的流操作之一来进行不同的处理.至于关闭Flink作业,通常将流作业设计为无限期运行,或者一直运行到有限的输入源到达其末尾为止,此时它们将自行关闭.
  • The Flink DataStream API is used to describe a dataflow graph that is sent to a cluster for execution when env.execute() is called. It doesn't make sense to wrap this in a while(true) loop.
  • socketTextStream sets up a client connection. Your server doesn't appear to do anything useful.
  • stream.equals("exit") -- stream is a DataStream, not a String. If you want to do something special when a stream element has a specific value, that needs to be done differently, by using one of the stream operations that does event-at-a-time processing. As for shutting down the Flink job, streaming jobs are normally designed to either run indefinitely, or to run until a finite input source reaches its end, at which point they shutdown on their own.

您可以大大简化事情.我将重新开始,然后以如下命令行替换您的客户端:

You can simplify things considerably. I would start over, and begin by replacing your client with a command line like this:

cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999

在这种情况下,

nc(netcat)将充当服务器,从而使Flink成为客户端.这将使事情变得更容易,因为这就是应该使用env.socketTextTream的方式.

nc (netcat) will act as a server in this case, allowing Flink to be a client. This will make things easier, as that's how env.socketTextTream is meant to be used.

然后,您将可以使用普通的Flink应用程序处理结果. socketTextStream将产生一个流,其中包含查询结果作为文本行,每行一行.

Then you'll be able to process the results with a normal Flink application. The socketTextStream will produce a stream containing the query's results as lines of text, one for each row.

这篇关于使用cassandra数据库查询作为Flink程序的源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆