I have a large CSV file ... 10 columns, 100 million lines, approximately 6 GB in size on my hard drive. I want to read this CSV file line by line and then load the data into a Microsoft SQL server database using SQL bulk copy. I read a couple of threads here as well as online. Most people believe that reading a CSV file in parallel does not buy much in terms of efficiency, since tasks / threads compete for disk access.
What I'm trying to do is read line by line from CSV and add it to a collection lock of 100k rows. And as soon as this collection reverses a new task / thread to write data to the SQL server using the SQLBuckCopy API.
I wrote this piece of code, but at runtime I got an error saying: "Attempting to call a bulk copy of an object that has a pending operation." This script looks like something that can be easily solved using the .NET 4.0 TPL, but I cannot get it to work. Any suggestions on what I'm doing wrong?
public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate) { const int inputCollectionBufferSize = 1000000; const int bulkInsertBufferCapacity = 100000; const int bulkInsertConcurrency = 8; var sqlConnection = new SqlConnection(connectionString); sqlConnection.Open(); var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock) { EnableStreaming = true, BatchSize = bulkInsertBufferCapacity, DestinationTableName = table, BulkCopyTimeout = (24 * 60 * 60), }; BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize); DataTable dataTable = new DataTable(table); dataTable.Columns.AddRange(columns); Task loadTask = Task.Factory.StartNew(() => { foreach (DataRow row in ReadRows(fileName, dataTable)) { rows.Add(row); } rows.CompleteAdding(); }); List<Task> insertTasks = new List<Task>(bulkInsertConcurrency); for (int i = 0; i < bulkInsertConcurrency; i++) { insertTasks.Add(Task.Factory.StartNew((x) => { List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity); foreach (DataRow row in rows.GetConsumingEnumerable()) { if (bulkInsertBuffer.Count == bulkInsertBufferCapacity) { SqlBulkCopy bulkCopy = x as SqlBulkCopy; var dataRows = bulkInsertBuffer.ToArray(); bulkCopy.WriteToServer(dataRows); Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count); bulkInsertBuffer.Clear(); } bulkInsertBuffer.Add(row); } }, sqlBulkCopy)); } loadTask.Wait(); Task.WaitAll(insertTasks.ToArray()); } private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable) { using (var textFieldParser = new TextFieldParser(fileName)) { textFieldParser.TextFieldType = FieldType.Delimited; textFieldParser.Delimiters = new[] { "," }; textFieldParser.HasFieldsEnclosedInQuotes = true; while (!textFieldParser.EndOfData) { string[] cols = textFieldParser.ReadFields(); DataRow row = dataTable.NewRow(); for (int i = 0; i < cols.Length; i++) { if (string.IsNullOrEmpty(cols[i])) { row[i] = DBNull.Value; } else { row[i] = cols[i]; } } yield return row; } } }
source share