Introduction
As I promised, I hacked a complete example. I used MySQL and created three tables:
CREATE TABLE `test{1,2,3}` ( `id` int(11) NOT NULL AUTO_INCREMENT, `data` varchar(255) NOT NULL UNIQUE, PRIMARY KEY (`id`) );
test2
contains only one line.
INSERT INTO `test2` (`data`) VALUES ('a');
( I posted the full code at http://pastebin.com .)
The following example shows a few things.
- Sets
threads
to 3
, which determines how many jobs will run in parallel. - Creates
threads
number of connections. - Gives some sample data for each table (by default, data
a
for each table). - Creates
threads
number of jobs that will be launched, and loads them with data. - Starts tasks in
threads
number of threads and waits for them to complete (successfully or not). - If no exceptions occur, each connection terminates; otherwise, he rolls back each of them.
- Closes connections (however, they can be reused).
(Note that I used the automatic resource management feature of Java 7 in SQLTask.call()
.)
Logics
public static void main(String[] args) throws SQLException, InterruptedException { int threads = 3; List<Connection> connections = getConnections(threads); Map<String, String> tableData = getTableData(threads); List<SQLTask> tasks = getTasks(threads, connections); setData(tableData, tasks); try { runTasks(tasks); commitConnections(connections); } catch (ExecutionException ex) { rollbackConnections(connections); } finally { closeConnections(connections); } }
Data
private static Map<String, String> getTableData(int threads) { Map<String, String> tableData = new HashMap<>(); for (int i = 1; i <= threads; i++) tableData.put("test" + i, "a"); return tableData; }
Tasks
private static final class SQLTask implements Callable<Void> { private final Connection connection; private String data; private String table; public SQLTask(Connection connection) { this.connection = connection; } public void setTable(String table) { this.table = table; } public void setData(String data) { this.data = data; } @Override public Void call() throws SQLException { try (Statement statement = connection.createStatement()) { statement.executeUpdate(String.format( "INSERT INTO `%s` (data) VALUES ('%s');", table, data)); } return null; } } private static List<SQLTask> getTasks(int threads, List<Connection> connections) { List<SQLTask> tasks = new ArrayList<>(); for (int i = 0; i < threads; i++) tasks.add(new SQLTask(connections.get(i))); return tasks; } private static void setData(Map<String, String> tableData, List<SQLTask> tasks) { Iterator<Entry<String, String>> i = tableData.entrySet().iterator(); Iterator<SQLTask> j = tasks.iterator(); while (i.hasNext()) { Entry<String, String> entry = i.next(); SQLTask task = j.next(); task.setTable(entry.getKey()); task.setData(entry.getValue()); } }
Run
private static void runTasks(List<SQLTask> tasks) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); List<Future<Void>> futures = executorService.invokeAll(tasks); executorService.shutdown(); for (Future<Void> future : futures) future.get(); }
Result
Given the default data returned by getTableData(...)
test1 -> `a` test2 -> `a` test3 -> `a`
and the fact that test2
already contains a
(and the data
column is unique), the second task will fail and throw an exception, so every connection will be dropped back.
If you return b
s instead of a
, then the connections will be transmitted securely.
This can be done similarly to LOAD DATA
.
After the OP answered my answer, I realized that what he / she wants to do cannot be done in a simple and understandable way.
The main problem is that after a successful commit, the data that was committed cannot be undone because the operation is atomic. If multiple commits are needed in a given case, rolling back everything is not possible if you do not keep track of all the data (in all transactions), and if something happens, it deletes everything that was successfully completed.
There is a good answer related to the issue of commits and rollbacks.