The fastest way to insert 100,000 records in DocumentDB

As the name suggests, I need to insert 100,000 records into the DocumentDb collection programmatically. This data will be used for reporting later. I use the Azure Documents SDK and the stored procedure for bulk insertion of documents (see Question Massive insertion of Azure documentdb using stored procedure ).

The following console application shows how I insert documents.

InsertDocuments generates 500 test documents to jump to the stored procedure. The main function calls InsertDocuments 10 times, inserting 5000 documents in total. Running this application causes 500 documents to be inserted every few seconds. If I increase the number of documents per call, I begin to receive errors and lose documents.

Can I recommend a faster way to insert documents?

static void Main(string[] args) { Console.WriteLine("Starting..."); MainAsync().Wait(); } static async Task MainAsync() { int campaignId = 1001, count = 500; for (int i = 0; i < 10; i++) { await InsertDocuments(campaignId, (count * i) + 1, (count * i) + count); } } static async Task InsertDocuments(int campaignId, int startId, int endId) { using (DocumentClient client = new DocumentClient(new Uri(documentDbUrl), documentDbKey)) { List<dynamic> items = new List<dynamic>(); // Create x number of documents to insert for (int i = startId; i <= endId; i++) { var item = new { id = Guid.NewGuid(), campaignId = campaignId, userId = i, status = "Pending" }; items.Add(item); } var task = client.ExecuteStoredProcedureAsync<dynamic>("/dbs/default/colls/campaignusers/sprocs/bulkImport", new RequestOptions() { PartitionKey = new PartitionKey(campaignId) }, new { items = items }); try { await task; int insertCount = (int)task.Result.Response; Console.WriteLine("{0} documents inserted...", insertCount); } catch (Exception e) { Console.WriteLine("Error: {0}", e.Message); } } } 
+14
source share
4 answers

The fastest way to embed documents in Azure DocumentDB. available as an example on Github: https://github.com/Azure/azure-documentdb-dotnet/tree/master/samples/documentdb-benchmark

The following tips will help you achieve maximum performance when using the .NET SDK:

  • Initialize a singleton DocumentClient
  • Use direct connection and TCP protocol ( ConnectionMode.Direct and ConnectionProtocol.Tcp )
  • Use hundreds of tasks in parallel (depending on your hardware)
  • Increase MaxConnectionLimit in the DocumentClient constructor to a high value of, say, 1000 connections
  • Turn on gcServer
  • Make sure your collection has the appropriate prepared bandwidth (and a good partition key)
  • Running in the same Azure region will also help

With 10,000 RU / s, you can insert 100,000 documents in about 50 seconds (about 5 units of write request).

With 100,000 RU / s you can insert about 5 seconds. You can do this as fast as you want by adjusting the throughput (and with a very high number of inserts, distribute the inserts across several virtual machines / workers)

EDIT: now you can use the library of mass artists at https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview , 7/12/19

+22
source

The Cosmos Db team has just released an SDK for bulk import and upgrade, unfortunately only available in Framework 4.5.1, but this obviously complicates the work and maximizes throughput. see

https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk -bulk-performer-net-point

+4
source

Another aproach is a stored procedure, as mentioned by other people. A stored procedure requires a split key. Also, the stored procedure must end within 4 seconds according to the documentation, otherwise all records will be rolled back. See the code below using a python-based azure documentdb sdk and javascript stored procedure. I changed the script and resolved many errors below the code works fine: -

 function bulkimport2(docObject) { var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); // The count of imported docs, also used as current doc index. var count = 0; getContext().getResponse().setBody(docObject.items); //return // Validate input. //if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject); docObject.items=JSON.stringify(docObject.items) docObject.items = docObject.items.replace("\\\\r", ""); docObject.items = docObject.items.replace("\\\\n", ""); var docs = JSON.parse(docObject.items); var docsLength = docObject.items.length; if (docsLength == 0) { getContext().getResponse().setBody(0); return; } // Call the CRUD API to create a document. tryCreate(docs[count], callback, collectionLink,count); // Note that there are 2 exit conditions: // 1) The createDocument request was not accepted. // In this case the callback will not be called, we just call setBody and we are done. // 2) The callback was called docs.length times. // In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done. function tryCreate(doc, callback, collectionLink,count ) { doc=JSON.stringify(doc); if (typeof doc == "undefined") { getContext().getResponse().setBody(count); return ; } else { doc = doc.replace("\\r", ""); doc = doc.replace("\\n", ""); doc=JSON.parse(doc); } getContext().getResponse().setBody(doc); var isAccepted = collection.upsertDocument(collectionLink, doc, callback); // If the request was accepted, callback will be called. // Otherwise report current count back to the client, // which will call the script again with remaining set of docs. // This condition will happen when this stored procedure has been running too long // and is about to get cancelled by the server. This will allow the calling client // to resume this batch from the point we got to before isAccepted was set to false if (!isAccepted) { getContext().getResponse().setBody(count); } } // This is called when collection.createDocument is done and the document has been persisted. function callback(err, doc, options) { if (err) throw getContext().getResponse().setBody(err + doc); // One more document has been inserted, increment the count. count++; if (count >= docsLength) { // If we have created all documents, we are done. Just set the response. getContext().getResponse().setBody(count); return ; } else { // Create next document. tryCreate(docs[count], callback, collectionLink,count); } } 

}

EDIT: - getContext (). getResponse (). setBody (count); return; // when all records are fully processed.

python script to load stored procedure and import packages

 # Initialize the Python DocumentDB client client = document_client.DocumentClient(config['ENDPOINT'], {'masterKey': config['MASTERKEY'] ,'DisableSSLVerification' : 'true' }) # Create a database #db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] }) db=client.ReadDatabases({ 'id': 'db2' }) print(db) # Create collection options options = { 'offerEnableRUPerMinuteThroughput': True, 'offerVersion': "V2", 'offerThroughput': 400 } # Create a collection #collection = client.CreateCollection('dbs/db2' , { 'id': 'coll2'}, options) #collection = client.CreateCollection({ 'id':'db2'},{ 'id': 'coll2'}, options) database_link = 'dbs/db2' collection_link = database_link + '/colls/coll2' """ #List collections collection = client.ReadCollection(collection_link) print(collection) print('Databases:') databases = list(client.ReadDatabases()) if not databases: print('No Databases:') for database in databases: print(database['id']) """ # Create some documents """ document1 = client.CreateDocument(collection['_self'], { 'Web Site': 0, 'Cloud Service': 0, 'Virtual Machine': 0, 'name': 'some' }) document2 = client.CreateDocument(collection['_self'], { 'Web Site': 1, 'Cloud Service': 0, 'Virtual Machine': 0, 'name': 'some' }) """ # Query them in SQL """ query = { 'query': 'SELECT * FROM server s' } options = {} options['enableCrossPartitionQuery'] = True options['maxItemCount'] = 20 #result_iterable = client.QueryDocuments(collection['_self'], query, options) result_iterable = client.QueryDocuments(collection_link, query, options) results = list(result_iterable); print(results) """ ##How to store procedure and use it """ sproc3 = { 'id': 'storedProcedure2', 'body': ( 'function (input) {' + ' getContext().getResponse().setBody(' + ' \'a\' + input.temp);' + '}') } retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3) result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/storedProcedure3',{'temp': 'so'}) """ ## delete all records in collection """ result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkDeleteSproc',"SELECT * FROM c ORDER BY c._ts DESC ") print(result) """ multiplerecords="""[{ "Virtual Machine": 0, "name": "some", "Web Site": 0, "Cloud Service": 0 }, { "Virtual Machine": 0, "name": "some", "Web Site": 1, "Cloud Service": 0 }]""" multiplerecords=json.loads(multiplerecords) print(multiplerecords) print(str(json.dumps(json.dumps(multiplerecords).encode('utf8')))) #bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(multiplerecords).encode('utf8')) #bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(json.loads(r'{"items": [{"name":"John","age":30,"city":"New York"},{"name":"John","age":30,"city":"New York"}]}')).encode('utf8')) str1='{name":"John","age":30,"city":"New York","PartitionKey" : "Morisplane"}' str2='{name":"John","age":30,"city":"New York","partitionKey" : "Morisplane"}' key1=base64.b64encode(str1.encode("utf-8")) key2=base64.b64encode(str2.encode("utf-8")) data= {"items":[{"id": key1 ,"name":"John","age":30,"city":"Morisplane","PartitionKey" : "Morisplane" },{"id": key2,"name":"John","age":30,"city":"Morisplane","partitionKey" : "Morisplane"}] , "city": "Morisplane", "partitionKey" : "Morisplane"} print(repr(data)) #retrieved_sproc3 =client.DeleteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2') sproc3 = { 'id': 'bulkimport2', 'body': ( """function bulkimport2(docObject) { var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); // The count of imported docs, also used as current doc index. var count = 0; getContext().getResponse().setBody(docObject.items); //return // Validate input. //if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject); docObject.items=JSON.stringify(docObject.items) docObject.items = docObject.items.replace("\\\\r", ""); docObject.items = docObject.items.replace("\\\\n", ""); var docs = JSON.parse(docObject.items); var docsLength = docObject.items.length; if (docsLength == 0) { getContext().getResponse().setBody(0); return; } // Call the CRUD API to create a document. tryCreate(docs[count], callback, collectionLink,count); // Note that there are 2 exit conditions: // 1) The createDocument request was not accepted. // In this case the callback will not be called, we just call setBody and we are done. // 2) The callback was called docs.length times. // In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done. function tryCreate(doc, callback, collectionLink,count ) { doc=JSON.stringify(doc); if (typeof doc == "undefined") { getContext().getResponse().setBody(count); return ; } else { doc = doc.replace("\\r", ""); doc = doc.replace("\\n", ""); doc=JSON.parse(doc); } getContext().getResponse().setBody(doc); return var isAccepted = collection.upsertDocument(collectionLink, doc, callback); // If the request was accepted, callback will be called. // Otherwise report current count back to the client, // which will call the script again with remaining set of docs. // This condition will happen when this stored procedure has been running too long // and is about to get cancelled by the server. This will allow the calling client // to resume this batch from the point we got to before isAccepted was set to false if (!isAccepted) { getContext().getResponse().setBody(count); } } // This is called when collection.createDocument is done and the document has been persisted. function callback(err, doc, options) { if (err) throw getContext().getResponse().setBody(err + doc); // One more document has been inserted, increment the count. count++; if (count >= docsLength) { // If we have created all documents, we are done. Just set the response. getContext().getResponse().setBody(count); return ; } else { // Create next document. tryCreate(docs[count], callback, collectionLink,count); } } }""" ) } #retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3) bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2', data , {"partitionKey" : "Morisplane"} ) print(repr(bulkloadresult)) 
0
source
 private async Task<T> ExecuteDataUpload<T>(IEnumerable<object> data,PartitionKey partitionKey) { using (var client = new DocumentClient(m_endPointUrl, m_authKey, connPol)) { while (true) { try { var result = await client.ExecuteStoredProcedureAsync<T>(m_spSelfLink, new RequestOptions { PartitionKey = partitionKey }, data); return result; } catch (DocumentClientException ex) { if (429 == (int)ex.StatusCode) { Thread.Sleep(ex.RetryAfter); continue; } if (HttpStatusCode.RequestTimeout == ex.StatusCode) { Thread.Sleep(ex.RetryAfter); continue; } throw ex; } catch (Exception) { Thread.Sleep(TimeSpan.FromSeconds(1)); continue; } } } } public async Task uploadData(IEnumerable<object> data, string partitionKey) { int groupSize = 600; int dataSize = data.Count(); int chunkSize = dataSize > groupSize ? groupSize : dataSize; List<Task> uploadTasks = new List<Task>(); while (dataSize > 0) { IEnumerable<object> chunkData = data.Take(chunkSize); object[] taskData = new object[3]; taskData[0] = chunkData; taskData[1] = chunkSize; taskData[2] = partitionKey; uploadTasks.Add(Task.Factory.StartNew(async (arg) => { object[] reqdData = (object[])arg; int chunkSizes = (int)reqdData[1]; IEnumerable<object> chunkDatas = (IEnumerable<object>)reqdData[0]; var partKey = new PartitionKey((string)reqdData[2]); int chunkDatasCount = chunkDatas.Count(); while (chunkDatasCount > 0) { int insertedCount = await ExecuteDataUpload<int>(chunkDatas, partKey); chunkDatas = chunkDatas.Skip(insertedCount); chunkDatasCount = chunkDatasCount - insertedCount; } }, taskData)); data = data.Skip(chunkSize); dataSize = dataSize - chunkSize; chunkSize = dataSize > groupSize ? groupSize : dataSize; } await Task.WhenAll(uploadTasks); } 

Now call uploadData in parallel with the list of objects that you want to upload. Just remember to send only Partitionkey data only.

0
source

Source: https://habr.com/ru/post/1270778/


All Articles