从Mysql读取表格数据的方法 [英] A way to read table data from Mysql to Pig
问题描述
但它们只支持从Pig到mysql的加载结果。
存储数据INTO DBStorage('com.mysql.jdbc.Driver','dbc:mysql:// host / db','INSERT ...' );
但请告诉我从mysql读取表格的方式。
data = LOAD'my_table'AS DBStorage('com.mysql.jdbc.Driver','dbc:mysql:// host / db','SELECT * FROM my_table');
这是我的代码
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
私人连接con;
private String jdbcURL;
私人字符串用户;
私人字符串传球;
private int batchSize;
private int count = 0;
私人字符串查询;
ResultSet结果;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader(){
}
public DBLoader(String driver,String jdbcURL,String user,String pass,
String query){
尝试{
Class.forName(driver);
} catch(ClassNotFoundException e){
log.error(无法加载数据库驱动程序:+ driver,e);
抛出新的RuntimeException(无法加载DB驱动程序,e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
$ b @Override
public InputFormat getInputFormat()抛出IOException {
// TODO自动生成的方法存根
返回新的TextInputFormat ();
$ b @Override
public Tuple getNext()抛出IOException {
// TODO自动生成的方法存根
boolean next = false;
尝试{
next = result.next();
} catch(SQLException e){
// TODO自动生成的catch块
e.printStackTrace();
}
if(!next)
return null;
int numColumns = 0;
//获取结果集元数据
ResultSetMetaData rsmd;
尝试{
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch(SQLException e){
// TODO自动生成的catch块
e.printStackTrace();
(int i = 0; i< numColumns; i ++){
try {
Object field = result.getObject(i );
switch(DataType.findType(field)){
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean)field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer)field);
break;
case DataType.LONG:
mProtoTuple.add((Long)field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float)field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double)field);
break;
case DataType.BYTEARRAY:
byte [] b =((DataByteArray)field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String)field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte)field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException(无法存储非平坦元组
+使用DbStorage);
默认值:
抛出新的RuntimeException(未知数据类型
+ DataType.findType(field));
$ b} catch(Exception ee){
throw new RuntimeException(ee);
}
}
元组t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
@Override
public void prepareToRead(RecordReader arg0,PigSplit arg1)
throws IOException {
con =空值;
if(query == null){
throw new IOException(SQL Insert command command not specified);
try {
if(user == null || pass == null){
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL,user,pass);
}
con.setAutoCommit(false);
result = con.createStatement()。executeQuery(query);
} catch(SQLException e){
log.error(Unable to connect to JDBC @+ jdbcURL);
抛出新的IOException(JDBC Error,e);
}
count = 0;
$ b @Override $ b $ public void setLocation(String location,Job job)throws IOException {
// TODO自动生成的方法存根
//TextInputFormat.setInputPaths(job,location);
$ b $ class MyDBInputFormat extends InputFormat< NullWritable,NullWritable> {
@Override
Public RecordReader< NullWritable,NullWritable> createRecordReader(
InputSplit arg0,TaskAttemptContext arg1)抛出IOException,
InterruptedException {
// TODO自动生成的方法存根
返回null;
}
@Override
public List< InputSplit> getSplits(JobContext arg0)抛出IOException,
InterruptedException {
// TODO自动生成的方法存根
返回null;
}
}
}
我尝试多次写UDF但不成功.....
解决方案 像你说的, DBStorage
仅支持将结果保存到数据库。
要从MySQL加载数据,您可以查看名为 sqoop (将数据从数据库复制到HDFS),或者您可以执行mysql转储,然后将文件复制到HDFS中。这两种方式都需要一些交互作用,并且不能直接在Pig内部使用。
第三种方法是研究编写Pig LoadFunc(你说你试图编写一个UDF )。它不应该太困难,您需要传递与DBStorage相同的选项(驱动程序,连接凭证和要执行的SQL查询),您也可以使用一些结果集元数据检查来自动生成模式。
Everyone know that Pig have supported DBStorage, but they are only supported load results from Pig to mysql like that
STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
But Please show me the way to read table from mysql like that
data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
Here is my code
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String driver, String jdbcURL, String user, String pass,
String query) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
// TODO Auto-generated method stub
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO Auto-generated method stub
boolean next = false;
try {
next = result.next();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try {
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (int i = 0; i < numColumns; i++) {
try {
Object field = result.getObject(i);
switch (DataType.findType(field)) {
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer) field);
break;
case DataType.LONG:
mProtoTuple.add((Long) field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float) field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double) field);
break;
case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");
default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));
}
} catch (Exception ee) {
throw new RuntimeException(ee);
}
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
con = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
try {
if (user == null || pass == null) {
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL, user, pass);
}
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
} catch (SQLException e) {
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
}
count = 0;
}
@Override
public void setLocation(String location, Job job) throws IOException {
// TODO Auto-generated method stub
//TextInputFormat.setInputPaths(job, location);
}
class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{
@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
}
I try many times to write UDF but not success.....
Like you say, DBStorage
only supports saving results to a database.
To load data from MySQL you could look into a project called sqoop (that copies data from a database to HDFS), or you could perform a mysql dump and then copy the file into HDFS. Both ways required some interaction and cannot be directly used from inside Pig.
A third option would be to look into writing a Pig LoadFunc (you say your tried to write a UDF). It shouldn't be too difficult, you'll need to pass much the same options as DBStorage (driver, connection credentials and a SQL query to execute), and you can probably use some result set metadata inspection to auto generate a schema too.
这篇关于从Mysql读取表格数据的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!