从Mysql读取表格数据的方法 [英] A way to read table data from Mysql to Pig

查看:72
本文介绍了从Mysql读取表格数据的方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家都知道Pig支持 DBStorage
但它们只支持从Pig到mysql的加载结果。

 存储数据INTO DBStorage('com.mysql.jdbc.Driver','dbc:mysql:// host / db','INSERT ...' ); 

但请告诉我从mysql读取表格的方式。

  data = LOAD'my_table'AS DBStorage('com.mysql.jdbc.Driver','dbc:mysql:// host / db','SELECT * FROM my_table'); 

这是我的代码

  public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
私人连接con;
private String jdbcURL;
私人字符串用户;
私人字符串传球;
private int batchSize;
private int count = 0;
私人字符串查询;
ResultSet结果;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();

public DBLoader(){
}

public DBLoader(String driver,String jdbcURL,String user,String pass,
String query){

尝试{
Class.forName(driver);
} catch(ClassNotFoundException e){
log.error(无法加载数据库驱动程序:+ driver,e);
抛出新的RuntimeException(无法加载DB驱动程序,e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;


$ b @Override
public InputFormat getInputFormat()抛出IOException {
// TODO自动生成的方法存根
返回新的TextInputFormat ();

$ b @Override
public Tuple getNext()抛出IOException {
// TODO自动生成的方法存根
boolean next = false;

尝试{
next = result.next();
} catch(SQLException e){
// TODO自动生成的catch块
e.printStackTrace();
}

if(!next)
return null;
int numColumns = 0;
//获取结果集元数据
ResultSetMetaData rsmd;
尝试{
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch(SQLException e){
// TODO自动生成的catch块
e.printStackTrace();


(int i = 0; i< numColumns; i ++){

try {
Object field = result.getObject(i );

switch(DataType.findType(field)){
case DataType.NULL:

mProtoTuple.add(null);

break;

case DataType.BOOLEAN:
mProtoTuple.add((Boolean)field);

break;

case DataType.INTEGER:
mProtoTuple.add((Integer)field);

break;

case DataType.LONG:
mProtoTuple.add((Long)field);

break;

case DataType.FLOAT:
mProtoTuple.add((Float)field);

break;

case DataType.DOUBLE:
mProtoTuple.add((Double)field);

break;

case DataType.BYTEARRAY:
byte [] b =((DataByteArray)field).get();
mProtoTuple.add(b);

break;
case DataType.CHARARRAY:
mProtoTuple.add((String)field);

break;
case DataType.BYTE:
mProtoTuple.add((Byte)field);

break;

case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException(无法存储非平坦元组
+使用DbStorage);

默认值:
抛出新的RuntimeException(未知数据类型
+ DataType.findType(field));


$ b} catch(Exception ee){
throw new RuntimeException(ee);
}
}

元组t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;


@Override
public void prepareToRead(RecordReader arg0,PigSplit arg1)
throws IOException {

con =空值;
if(query == null){
throw new IOException(SQL Insert command command not specified);

try {
if(user == null || pass == null){
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL,user,pass);
}
con.setAutoCommit(false);
result = con.createStatement()。executeQuery(query);
} catch(SQLException e){
log.error(Unable to connect to JDBC @+ jdbcURL);
抛出新的IOException(JDBC Error,e);
}
count = 0;

$ b @Override $ b $ public void setLocation(String location,Job job)throws IOException {
// TODO自动生成的方法存根

//TextInputFormat.setInputPaths(job,location);


$ b $ class MyDBInputFormat extends InputFormat< NullWritable,NullWritable> {

@Override
Public RecordReader< NullWritable,NullWritable> createRecordReader(
InputSplit arg0,TaskAttemptContext arg1)抛出IOException,
InterruptedException {
// TODO自动生成的方法存根
返回null;
}

@Override
public List< InputSplit> getSplits(JobContext arg0)抛出IOException,
InterruptedException {
// TODO自动生成的方法存根
返回null;
}

}

}

我尝试多次写UDF但不成功.....

解决方案

像你说的, DBStorage 仅支持将结果保存到数据库。



要从MySQL加载数据,您可以查看名为 sqoop (将数据从数据库复制到HDFS),或者您可以执行mysql转储,然后将文件复制到HDFS中。这两种方式都需要一些交互作用,并且不能直接在Pig内部使用。



第三种方法是研究编写Pig LoadFunc(你说你试图编写一个UDF )。它不应该太困难,您需要传递与DBStorage相同的选项(驱动程序,连接凭证和要执行的SQL查询),您也可以使用一些结果集元数据检查来自动生成模式。

Everyone know that Pig have supported DBStorage, but they are only supported load results from Pig to mysql like that

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

But Please show me the way to read table from mysql like that

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

Here is my code

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) {

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        }
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        // TODO Auto-generated method stub
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        boolean next = false;

        try {
            next = result.next();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try {
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (int i = 0; i < numColumns; i++) {

            try {
                Object field = result.getObject(i);

                switch (DataType.findType(field)) {
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                }

            } catch (Exception ee) {
                throw new RuntimeException(ee);
            }
        }

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException {

        con = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        try {
            if (user == null || pass == null) {
                con = DriverManager.getConnection(jdbcURL);
            } else {
                con = DriverManager.getConnection(jdbcURL, user, pass);
            }
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
        } catch (SQLException e) {
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        }
        count = 0;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    }

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

    }

}

I try many times to write UDF but not success.....

解决方案

Like you say, DBStorage only supports saving results to a database.

To load data from MySQL you could look into a project called sqoop (that copies data from a database to HDFS), or you could perform a mysql dump and then copy the file into HDFS. Both ways required some interaction and cannot be directly used from inside Pig.

A third option would be to look into writing a Pig LoadFunc (you say your tried to write a UDF). It shouldn't be too difficult, you'll need to pass much the same options as DBStorage (driver, connection credentials and a SQL query to execute), and you can probably use some result set metadata inspection to auto generate a schema too.

这篇关于从Mysql读取表格数据的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆