C # the fastest way to insert data into SQL database

I get (streaming) data from an external source (via Lightstreamer) to my C # application. The C # application receives data from the listener. Listener data is queued (ConcurrentQueue). The queue is flushed every 0.5 seconds using the TryDequeue in the DataTable. Then the DataTable will be copied to the SQL database using SqlBulkCopy. The SQL database processes the new data received from the staging table into the resulting table. Currently, I get about 300,000 rows per day (may increase over the next weeks), and my goal is to stay less than 1 second from receiving the data until it is available in the last SQL table. Currently, the maximum lines per second that I have to process are around 50 lines.

Unfortunately, as I get more and more data, my logic is getting slower in performance (still much less than 1 second, but I want to keep improving). The main bottleneck (for now) is the processing of intermediate data (in the SQL database) into the final table. To improve performance, I would like to switch the staging table into a memory optimized table. The final table is already a memory-optimized table, so they will work together for sure.

My questions:

  • Is there a way to use SqlBulkCopy (from C #) with memory optimized tables? (as far as I know, there is no way)
  • Any suggestions on the fastest way to write the received data from my C # application to a staging table?

EDIT (with solution):

After comments / answers and performance evaluation, I decided to abandon the bulk insert and use SQLCommand to pass IEnumerable with my data as a parameter to the value table into my own compiled stored procedure to store data directly in my optimized memory the final table (as well as a copy of the setting table ", which now serves as the archive). Performance has increased significantly (even I have not yet considered the possibility of parallelizing inserts (will be at a later stage)).

Here is the piece of code:

User-optimized custom table type (for transferring data from C # to SQL (stored procedure):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE( [CityIndexInstrumentID] [int] NOT NULL, [CityIndexTimeStamp] [bigint] NOT NULL, [BidPrice] [numeric](18, 8) NOT NULL, [AskPrice] [numeric](18, 8) NOT NULL, INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED ( [CityIndexInstrumentID] ASC, [CityIndexTimeStamp] ASC, [BidPrice] ASC, [AskPrice] ASC ) ) WITH ( MEMORY_OPTIMIZED = ON ) 

Own compiled stored procedures for inserting data into the final table and setting (in this case it is an archive):

 create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging] ( @ProcessingID int, @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly ) with native_compilation, schemabinding, execute as owner as begin atomic with (transaction isolation level=snapshot, language=N'us_english') -- store prices insert into TimeSeries.CityIndexIntradayLivePrices ( ObjectID, PerDateTime, BidPrice, AskPrice, ProcessingID ) select Objects.ObjectID, CityIndexTimeStamp, CityIndexIntradayLivePricesStaging.BidPrice, CityIndexIntradayLivePricesStaging.AskPrice, @ProcessingID from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging, Objects.Objects where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID -- store data in staging table insert into Staging.CityIndexIntradayLivePricesStaging ( ImportProcessingID, CityIndexInstrumentID, CityIndexTimeStamp, BidPrice, AskPrice ) select @ProcessingID, CityIndexInstrumentID, CityIndexTimeStamp, BidPrice, AskPrice from @CityIndexIntradayLivePrices end 

IEnumerable populated from the queue:

 private static IEnumerable<SqlDataRecord> CreateSqlDataRecords() { // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter) SqlMetaData MetaDataCol1; SqlMetaData MetaDataCol2; SqlMetaData MetaDataCol3; SqlMetaData MetaDataCol4; MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int); MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt); MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale // define sql data record with the columns SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 }); // remove each price row from queue and add it to the sql data record LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO(); while (IntradayQuotesQueue.TryDequeue(out PriceDTO)) { DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence) DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price yield return DataRecord; } } 

Processing data every 0.5 seconds:

 public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID) { try { // open new sql connection using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false")) { // open connection TimeSeriesDatabaseSQLConnection.Open(); // endless loop to keep thread alive while(true) { // ensure queue has rows to process (otherwise no need to continue) if(IntradayQuotesQueue.Count > 0) { // define stored procedure for sql command SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection); // set command type to stored procedure InsertCommand.CommandType = CommandType.StoredProcedure; // define sql parameters (table-value parameter gets data from CreateSqlDataRecords()) SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters) ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured; // execute stored procedure InsertCommand.ExecuteNonQuery(); } // wait 0.5 seconds Thread.Sleep(500); } } } catch (Exception e) { // handle error (standard error messages and update processing) ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e); }; } 
+5
source share
3 answers

Use SQL Server 2016 (this is not RTM yet, but it is much better than 2014 when it comes to memory-optimized tables). Then use either memory-optimized table optimization or just explode a lot of the built-in stored procedure triggers a transaction, each of which makes one insert, depending on which is faster in your scenario (it depends on the situation). A few things to note:

  • Performing multiple attachments in a single transaction is vital to preservation in network transitions. Although RAM operations are very fast, SQL Server still needs to confirm each operation.
  • Depending on how you create the data, you may find that parallelizing inserts can speed up the process (donโ€™t overdo it, you will quickly hit the saturation point). Do not try to be very smart here; use async / await and / or Parallel.ForEach .
  • If you pass a table parameter, the easiest way to do this is to pass the DataTable value as the parameter value, but this is not the most efficient way to do this - it will pass an IEnumerable<SqlDataRecord> . You can use the iterator method to generate values, so only a constant amount of memory is allocated.

You will have to experiment a bit to find the best way to get through the data; it greatly depends on the size of your data and how you get it.

+2
source

The data packet from the intermediate table to the final table in the number of rows is less than 5 thousand, I usually use 4k and do not insert them into the transaction. Instead, implement software transactions as needed. Remaining under 5k inserted rows, the number of row locks increases until the table is locked, which should wait until everyone else exits the table.

0
source

Are you sure your logic is slowing down, not the actual transactions in the database? For example, the Entity Framework is โ€œsensitiveโ€ due to the lack of a better term when trying to insert a ton of lines and becomes rather slow.

There is a third-party Codeplex BulkInsert library that I used, and it's pretty nice to do bulk data insertion: https://efbulkinsert.codeplex.com/

You can also write your own extension method in DBContext if you are an EF that does this too, which can be based on the number of records. Everything under 5000 lines uses Save (), everything that you can use in your own insert insertion logic.

0
source

Source: https://habr.com/ru/post/1247459/


All Articles