Export large CSV file parallel to SQL server

I have a large CSV file ... 10 columns, 100 million lines, approximately 6 GB in size on my hard drive. I want to read this CSV file line by line and then load the data into a Microsoft SQL server database using SQL bulk copy. I read a couple of threads here as well as online. Most people believe that reading a CSV file in parallel does not buy much in terms of efficiency, since tasks / threads compete for disk access.

What I'm trying to do is read line by line from CSV and add it to a collection lock of 100k rows. And as soon as this collection reverses a new task / thread to write data to the SQL server using the SQLBuckCopy API.

I wrote this piece of code, but at runtime I got an error saying: "Attempting to call a bulk copy of an object that has a pending operation." This script looks like something that can be easily solved using the .NET 4.0 TPL, but I cannot get it to work. Any suggestions on what I'm doing wrong?

public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate) { const int inputCollectionBufferSize = 1000000; const int bulkInsertBufferCapacity = 100000; const int bulkInsertConcurrency = 8; var sqlConnection = new SqlConnection(connectionString); sqlConnection.Open(); var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock) { EnableStreaming = true, BatchSize = bulkInsertBufferCapacity, DestinationTableName = table, BulkCopyTimeout = (24 * 60 * 60), }; BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize); DataTable dataTable = new DataTable(table); dataTable.Columns.AddRange(columns); Task loadTask = Task.Factory.StartNew(() => { foreach (DataRow row in ReadRows(fileName, dataTable)) { rows.Add(row); } rows.CompleteAdding(); }); List<Task> insertTasks = new List<Task>(bulkInsertConcurrency); for (int i = 0; i < bulkInsertConcurrency; i++) { insertTasks.Add(Task.Factory.StartNew((x) => { List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity); foreach (DataRow row in rows.GetConsumingEnumerable()) { if (bulkInsertBuffer.Count == bulkInsertBufferCapacity) { SqlBulkCopy bulkCopy = x as SqlBulkCopy; var dataRows = bulkInsertBuffer.ToArray(); bulkCopy.WriteToServer(dataRows); Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count); bulkInsertBuffer.Clear(); } bulkInsertBuffer.Add(row); } }, sqlBulkCopy)); } loadTask.Wait(); Task.WaitAll(insertTasks.ToArray()); } private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable) { using (var textFieldParser = new TextFieldParser(fileName)) { textFieldParser.TextFieldType = FieldType.Delimited; textFieldParser.Delimiters = new[] { "," }; textFieldParser.HasFieldsEnclosedInQuotes = true; while (!textFieldParser.EndOfData) { string[] cols = textFieldParser.ReadFields(); DataRow row = dataTable.NewRow(); for (int i = 0; i < cols.Length; i++) { if (string.IsNullOrEmpty(cols[i])) { row[i] = DBNull.Value; } else { row[i] = cols[i]; } } yield return row; } } } 
+6
source share
3 answers

Not.

Parallel access may or may not speed up the reading of the file (it wonโ€™t, but I'm not going to fight this battle ...), but for some parallel records it wonโ€™t give you a faster mass insert, This is due to the fact that the minimum filled insert ( i.e. really quick insert) requires table locking. See Prerequisites for Minimum Registration in Bulk Import :

Minimal logging requires the target table to meet the following conditions:

...
- The table lock is indicated (using TABLOCK) .
...

Parallel inserts, by definition, cannot receive parallel table locks. QED You bark the wrong tree.

Stop receiving your sources from random searches on the Internet. Read the Data Load Efficiency Guide , this manual for ... data loading.

I would recommend you stop reinventing the wheel. Use SSIS , this is exactly what is meant for processing.

+6
source

http://joshclose.imtqy.com/CsvHelper/

https://efbulkinsert.codeplex.com/

If possible, I suggest you read your file in List <T> using the above csvhelper and write to your db using the bulk insert as you do or the efbulkinsert that I used, and it is amazingly fast.

 using CsvHelper; public static List<T> CSVImport<T,TClassMap>(string csvData, bool hasHeaderRow, char delimiter, out string errorMsg) where TClassMap : CsvHelper.Configuration.CsvClassMap { errorMsg = string.Empty; var result = Enumerable.Empty<T>(); MemoryStream memStream = new MemoryStream(Encoding.UTF8.GetBytes(csvData)); StreamReader streamReader = new StreamReader(memStream); var csvReader = new CsvReader(streamReader); csvReader.Configuration.RegisterClassMap<TClassMap>(); csvReader.Configuration.DetectColumnCountChanges = true; csvReader.Configuration.IsHeaderCaseSensitive = false; csvReader.Configuration.TrimHeaders = true; csvReader.Configuration.Delimiter = delimiter.ToString(); csvReader.Configuration.SkipEmptyRecords = true; List<T> items = new List<T>(); try { items = csvReader.GetRecords<T>().ToList(); } catch (Exception ex) { while (ex != null) { errorMsg += ex.Message + Environment.NewLine; foreach (var val in ex.Data.Values) errorMsg += val.ToString() + Environment.NewLine; ex = ex.InnerException; } } return items; } } 

Edit - I donโ€™t understand what you are doing with volumetric insertion. You want the array to insert the entire list or table of data data, rather than row by row.

+5
source

You can create a storage procedure and transfer the file location as shown below.

 CREATE PROCEDURE [dbo].[CSVReaderTransaction] @Filepath varchar(100)='' AS -- STEP 1: Start the transaction BEGIN TRANSACTION -- STEP 2 & 3: checking @@ERROR after each statement EXEC ('BULK INSERT Employee FROM ''' +@Filepath +''' WITH (FIELDTERMINATOR = '','', ROWTERMINATOR = ''\n'' )') -- Rollback the transaction if there were any errors IF @@ERROR <> 0 BEGIN -- Rollback the transaction ROLLBACK -- Raise an error and return RAISERROR ('Error in inserting data into employee Table.', 16, 1) RETURN END COMMIT TRANSACTION 

You can also add a BATCHSIZE parameter, such as FIELDTERMINATOR and ROWTERMINATOR.

+3
source

Source: https://habr.com/ru/post/977057/


All Articles