Separate transaction using multiple connections. (MYSQL / JDBC)

The application I'm working on is a Java-based ETL process that loads data into multiple tables. DBMS is Infobright (a DBMS based on MYSQL designed for data storage).

Data loading should be performed atomically; however, for performance reasons, I want to load data into multiple tables at the same time (using the LOAD DATA INFILE ). This means that I need to open multiple connections.

Is there any solution that allows me to perform loads atomically and in parallel? (I assume that the answer may depend on the engine for loadable tables, most of them are Brighthouse, which allows transactions, but not XA and there are no Savepoints).

To clarify, I want to avoid the situation when we say:

  • I load data into 5 tables.
  • I transfer loads for the first 4 tables
  • Fix error for table 5

In this situation, I can not roll back the first 4 downloads, because they are already committed.

+4
source share
2 answers

Introduction

As I promised, I hacked a complete example. I used MySQL and created three tables:

 CREATE TABLE `test{1,2,3}` ( `id` int(11) NOT NULL AUTO_INCREMENT, `data` varchar(255) NOT NULL UNIQUE, PRIMARY KEY (`id`) ); 

test2 contains only one line.

 INSERT INTO `test2` (`data`) VALUES ('a'); 

( I posted the full code at http://pastebin.com .)

The following example shows a few things.

  • Sets threads to 3 , which determines how many jobs will run in parallel.
  • Creates threads number of connections.
  • Gives some sample data for each table (by default, data a for each table).
  • Creates threads number of jobs that will be launched, and loads them with data.
  • Starts tasks in threads number of threads and waits for them to complete (successfully or not).
  • If no exceptions occur, each connection terminates; otherwise, he rolls back each of them.
  • Closes connections (however, they can be reused).

(Note that I used the automatic resource management feature of Java 7 in SQLTask.call() .)

Logics

 public static void main(String[] args) throws SQLException, InterruptedException { int threads = 3; List<Connection> connections = getConnections(threads); Map<String, String> tableData = getTableData(threads); List<SQLTask> tasks = getTasks(threads, connections); setData(tableData, tasks); try { runTasks(tasks); commitConnections(connections); } catch (ExecutionException ex) { rollbackConnections(connections); } finally { closeConnections(connections); } } 

Data

 private static Map<String, String> getTableData(int threads) { Map<String, String> tableData = new HashMap<>(); for (int i = 1; i <= threads; i++) tableData.put("test" + i, "a"); return tableData; } 

Tasks

 private static final class SQLTask implements Callable<Void> { private final Connection connection; private String data; private String table; public SQLTask(Connection connection) { this.connection = connection; } public void setTable(String table) { this.table = table; } public void setData(String data) { this.data = data; } @Override public Void call() throws SQLException { try (Statement statement = connection.createStatement()) { statement.executeUpdate(String.format( "INSERT INTO `%s` (data) VALUES ('%s');", table, data)); } return null; } } private static List<SQLTask> getTasks(int threads, List<Connection> connections) { List<SQLTask> tasks = new ArrayList<>(); for (int i = 0; i < threads; i++) tasks.add(new SQLTask(connections.get(i))); return tasks; } private static void setData(Map<String, String> tableData, List<SQLTask> tasks) { Iterator<Entry<String, String>> i = tableData.entrySet().iterator(); Iterator<SQLTask> j = tasks.iterator(); while (i.hasNext()) { Entry<String, String> entry = i.next(); SQLTask task = j.next(); task.setTable(entry.getKey()); task.setData(entry.getValue()); } } 

Run

 private static void runTasks(List<SQLTask> tasks) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); List<Future<Void>> futures = executorService.invokeAll(tasks); executorService.shutdown(); for (Future<Void> future : futures) future.get(); } 

Result

Given the default data returned by getTableData(...)

 test1 -> `a` test2 -> `a` test3 -> `a` 

and the fact that test2 already contains a (and the data column is unique), the second task will fail and throw an exception, so every connection will be dropped back.

If you return b s instead of a , then the connections will be transmitted securely.

This can be done similarly to LOAD DATA .


After the OP answered my answer, I realized that what he / she wants to do cannot be done in a simple and understandable way.

The main problem is that after a successful commit, the data that was committed cannot be undone because the operation is atomic. If multiple commits are needed in a given case, rolling back everything is not possible if you do not keep track of all the data (in all transactions), and if something happens, it deletes everything that was successfully completed.

There is a good answer related to the issue of commits and rollbacks.

+5
source

Actually in the new version of IEE, but not in ICE, there is an additional DLP (Distributed Load Processing) function. The site has a PDF file linked here:

http://www.infobright.com/Products/Features/

0
source

Source: https://habr.com/ru/post/1383593/


All Articles