I get (streaming) data from an external source (via Lightstreamer) to my C # application. The C # application receives data from the listener. Listener data is queued (ConcurrentQueue). The queue is flushed every 0.5 seconds using the TryDequeue in the DataTable. Then the DataTable will be copied to the SQL database using SqlBulkCopy. The SQL database processes the new data received from the staging table into the resulting table. Currently, I get about 300,000 rows per day (may increase over the next weeks), and my goal is to stay less than 1 second from receiving the data until it is available in the last SQL table. Currently, the maximum lines per second that I have to process are around 50 lines.
Unfortunately, as I get more and more data, my logic is getting slower in performance (still much less than 1 second, but I want to keep improving). The main bottleneck (for now) is the processing of intermediate data (in the SQL database) into the final table. To improve performance, I would like to switch the staging table into a memory optimized table. The final table is already a memory-optimized table, so they will work together for sure.
My questions:
- Is there a way to use SqlBulkCopy (from C #) with memory optimized tables? (as far as I know, there is no way)
- Any suggestions on the fastest way to write the received data from my C # application to a staging table?
EDIT (with solution):
After comments / answers and performance evaluation, I decided to abandon the bulk insert and use SQLCommand to pass IEnumerable with my data as a parameter to the value table into my own compiled stored procedure to store data directly in my optimized memory the final table (as well as a copy of the setting table ", which now serves as the archive). Performance has increased significantly (even I have not yet considered the possibility of parallelizing inserts (will be at a later stage)).
Here is the piece of code:
User-optimized custom table type (for transferring data from C # to SQL (stored procedure):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE( [CityIndexInstrumentID] [int] NOT NULL, [CityIndexTimeStamp] [bigint] NOT NULL, [BidPrice] [numeric](18, 8) NOT NULL, [AskPrice] [numeric](18, 8) NOT NULL, INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED ( [CityIndexInstrumentID] ASC, [CityIndexTimeStamp] ASC, [BidPrice] ASC, [AskPrice] ASC ) ) WITH ( MEMORY_OPTIMIZED = ON )
Own compiled stored procedures for inserting data into the final table and setting (in this case it is an archive):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging] ( @ProcessingID int, @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly ) with native_compilation, schemabinding, execute as owner as begin atomic with (transaction isolation level=snapshot, language=N'us_english')
IEnumerable populated from the queue:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords() { // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter) SqlMetaData MetaDataCol1; SqlMetaData MetaDataCol2; SqlMetaData MetaDataCol3; SqlMetaData MetaDataCol4; MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int); MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt); MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale // define sql data record with the columns SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 }); // remove each price row from queue and add it to the sql data record LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO(); while (IntradayQuotesQueue.TryDequeue(out PriceDTO)) { DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence) DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price yield return DataRecord; } }
Processing data every 0.5 seconds:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID) { try { // open new sql connection using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false")) { // open connection TimeSeriesDatabaseSQLConnection.Open(); // endless loop to keep thread alive while(true) { // ensure queue has rows to process (otherwise no need to continue) if(IntradayQuotesQueue.Count > 0) { // define stored procedure for sql command SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection); // set command type to stored procedure InsertCommand.CommandType = CommandType.StoredProcedure; // define sql parameters (table-value parameter gets data from CreateSqlDataRecords()) SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters) ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured; // execute stored procedure InsertCommand.ExecuteNonQuery(); } // wait 0.5 seconds Thread.Sleep(500); } } } catch (Exception e) { // handle error (standard error messages and update processing) ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e); }; }