The way to read table data from Mysql to Pig

Everyone knows that Pig supported DBStorage , but they are only supported when downloading results from Pig to mysql, such as

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...'); 

But please show me how to read a table from mysql like

 data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table'); 

Here is my code

 public class DBLoader extends LoadFunc { private final Log log = LogFactory.getLog(getClass()); private ArrayList mProtoTuple = null; private Connection con; private String jdbcURL; private String user; private String pass; private int batchSize; private int count = 0; private String query; ResultSet result; protected TupleFactory mTupleFactory = TupleFactory.getInstance(); public DBLoader() { } public DBLoader(String driver, String jdbcURL, String user, String pass, String query) { try { Class.forName(driver); } catch (ClassNotFoundException e) { log.error("can't load DB driver:" + driver, e); throw new RuntimeException("Can't load DB Driver", e); } this.jdbcURL = jdbcURL; this.user = user; this.pass = pass; this.query = query; } @Override public InputFormat getInputFormat() throws IOException { // TODO Auto-generated method stub return new TextInputFormat(); } @Override public Tuple getNext() throws IOException { // TODO Auto-generated method stub boolean next = false; try { next = result.next(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (!next) return null; int numColumns = 0; // Get result set meta data ResultSetMetaData rsmd; try { rsmd = result.getMetaData(); numColumns = rsmd.getColumnCount(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } for (int i = 0; i < numColumns; i++) { try { Object field = result.getObject(i); switch (DataType.findType(field)) { case DataType.NULL: mProtoTuple.add(null); break; case DataType.BOOLEAN: mProtoTuple.add((Boolean) field); break; case DataType.INTEGER: mProtoTuple.add((Integer) field); break; case DataType.LONG: mProtoTuple.add((Long) field); break; case DataType.FLOAT: mProtoTuple.add((Float) field); break; case DataType.DOUBLE: mProtoTuple.add((Double) field); break; case DataType.BYTEARRAY: byte[] b = ((DataByteArray) field).get(); mProtoTuple.add(b); break; case DataType.CHARARRAY: mProtoTuple.add((String) field); break; case DataType.BYTE: mProtoTuple.add((Byte) field); break; case DataType.MAP: case DataType.TUPLE: case DataType.BAG: throw new RuntimeException("Cannot store a non-flat tuple " + "using DbStorage"); default: throw new RuntimeException("Unknown datatype " + DataType.findType(field)); } } catch (Exception ee) { throw new RuntimeException(ee); } } Tuple t = mTupleFactory.newTuple(mProtoTuple); mProtoTuple.clear(); return t; } @Override public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException { con = null; if (query == null) { throw new IOException("SQL Insert command not specified"); } try { if (user == null || pass == null) { con = DriverManager.getConnection(jdbcURL); } else { con = DriverManager.getConnection(jdbcURL, user, pass); } con.setAutoCommit(false); result = con.createStatement().executeQuery(query); } catch (SQLException e) { log.error("Unable to connect to JDBC @" + jdbcURL); throw new IOException("JDBC Error", e); } count = 0; } @Override public void setLocation(String location, Job job) throws IOException { // TODO Auto-generated method stub //TextInputFormat.setInputPaths(job, location); } class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{ @Override public RecordReader<NullWritable, NullWritable> createRecordReader( InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return null; } @Override public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException { // TODO Auto-generated method stub return null; } } } 

I try to write UDF many times, but did not have time .....

+6
source share
1 answer

As you say, DBStorage supports storing results in a database.

To load data from MySQL, you can look at a project called sqoop (which copies data from the database to HDFS), or you can dump mysql and then copy the file to HDFS. Both methods require some interaction and cannot be used directly from within Pig.

The third option would be to look at the Pig LoadFunc record (you say you tried to write UDF). This should not be too complicated, you will need to pass the same parameters as DBStorage (driver, connection credentials and SQL query to execute), and you can probably use some result set metadata checks to automatically generate the schema .

+2
source

Source: https://habr.com/ru/post/917578/


All Articles