使用Apache Sqoop将数据从Mongo / Cassandra导出到HDFS [英] Exporting data from Mongo/Cassandra to HDFS using Apache Sqoop

查看:248
本文介绍了使用Apache Sqoop将数据从Mongo / Cassandra导出到HDFS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个问题,我必须从多个数据源(例如RDBMS(MYSQL,Oracle)和NOSQL(MongoDb,Cassandra))通过Hive将数据读取到HDFS中(增量)

Apache Sqoop完美适用于RDBMS,但它不适用于NOSQL,至少我无法成功使用它(我尝试使用Mongo的JDBC驱动程序......它能够连接到Mongo但无法推送到HDFS)

如果任何人已经完成了与此相关的任何工作并且可以共享它,那将非常有用

解决方案

我使用了一个来自web的例子,并且能够将文件从Mongo传输到HDFS,反之亦然。我现在无法自己收集确切的网页。但是该程序如下所示。



您可以从中获得灵感并继续前进。

  import org.apache.hadoop.conf.Configuration; 
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bson.BSONObject;
import org.bson.types.ObjectId;

import com.mongodb.hadoop.MongoInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.mongodb.hadoop.util.MongoConfigUtil;

public class CopyFromMongodbToHDFS {

public static class ImportWeblogsFromMongo extends
Mapper< LongWritable,Text,Text,Text> {
$ b $ public void map(Object key,BSONObject value,Context context)
throws IOException,InterruptedException {

System.out.println(Key:+键);
System.out.println(Value:+ value);
String md5 = value.get(md5)。toString();
String url = value.get(url)。toString();
String date = value.get(date)。toString();
String time = value.get(time)。toString();
String ip = value.get(ip)。toString();
String output =\ t+ url +\ t+ date +\t+ time +\t+ ip;
context.write(new Text(md5),new Text(output));




public static void main(String [] args)throws IOException,
InterruptedException,ClassNotFoundException {

Configuration conf = new Configuration();
MongoConfigUtil.setInputURI(conf,
mongodb://127.0.0.1:27017 / test.mylogs);

System.out.println(Configuration:+ conf);

@SuppressWarnings(deprecation)
Job job = new Job(conf,Mongo Import);

Path out = new Path(/ user / cloudera / test1 / logs.txt);

FileOutputFormat.setOutputPath(job,out);

job.setJarByClass(CopyFromMongodbToHDFS.class);
job.setMapperClass(ImportWeblogsFromMongo.class);

job.setOutputKeyClass(ObjectId.class);
job.setOutputValueClass(BSONObject.class);

job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);

}

}


I have a problem where I have to read data from multiple data sources i.e RDBMS(MYSQL,Oracle) and NOSQL(MongoDb, Cassandra) to HDFS via Hive.(incrementally)

Apache Sqoop works perfectly for RDBMS but it does not work for NOSQL, at-least I was not able to successfully use it, (I tried to use the JDBC driver for Mongo...It was able to connect to Mongo but could not push to HDFS)

IF any one has done any work related to this and can share it , would be really very helpfull

解决方案

I have used an example from web and able to transfer files from Mongo to HDFS and the other way round. I couldn't gather myself of the exact web page right now. But the program looks like below.

You can get a spark out of this and move on.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bson.BSONObject;
import org.bson.types.ObjectId;

import com.mongodb.hadoop.MongoInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.mongodb.hadoop.util.MongoConfigUtil;

public class CopyFromMongodbToHDFS {

    public static class ImportWeblogsFromMongo extends
            Mapper<LongWritable, Text, Text, Text> {

        public void map(Object key, BSONObject value, Context context)
                throws IOException, InterruptedException {

            System.out.println("Key: " + key);
            System.out.println("Value: " + value);
            String md5 = value.get("md5").toString();
            String url = value.get("url").toString();
            String date = value.get("date").toString();
            String time = value.get("time").toString();
            String ip = value.get("ip").toString();
            String output = "\t" + url + "\t" + date + "\t" + time + "\t" + ip;
            context.write(new Text(md5), new Text(output));

        }

    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        MongoConfigUtil.setInputURI(conf,
                "mongodb://127.0.0.1:27017/test.mylogs");

        System.out.println("Configuration: " + conf);

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "Mongo Import");

        Path out = new Path("/user/cloudera/test1/logs.txt");

        FileOutputFormat.setOutputPath(job, out);

        job.setJarByClass(CopyFromMongodbToHDFS.class);
        job.setMapperClass(ImportWeblogsFromMongo.class);

        job.setOutputKeyClass(ObjectId.class);
        job.setOutputValueClass(BSONObject.class);

        job.setInputFormatClass(MongoInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setNumReduceTasks(0);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

这篇关于使用Apache Sqoop将数据从Mongo / Cassandra导出到HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆