Database exception in Slick 3.0 during batch insertion

Pasting thousands of records in five seconds through batch insertion in slick 3, I get

org.postgresql.util.PSQLException: FATAL: sorry, too many clients already 

My data access level is as follows:

 val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver) override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = { val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)} //db.close() res } override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write] = { query ++= (rowList) } 

closing a connection in a batch insert does not affect ... it still gives the same error.

I call the insert package from my code as follows:

 val temp1 = list1.flatMap { li => Future.sequence(li.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) } yield ((tripData, subTripData)) val res=db.run(data.transactionally) res //db.close() }) } 

if I close the connection after my work here, as you can see in the commented code, I get error:

 java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb [Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] 

After calling the method without Future.sequence, like this:

  val temp1 =list.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) } yield ((tripData, subTripData)) val res=db.run(data.transactionally) res } 

I still have too many customer errors ...

+4
source share
1 answer

The root of this problem is that you simultaneously include an unlimited Future list, each of which connects to the database - one per entry in the list .

This can be solved by performing your inserts in sequential order, making each insert combination depend on the previous one:

 // Empty Future for the results. Replace Unit with the correct type - whatever // "res" is below. val emptyFuture = Future.successful(Seq.empty[Unit]) // This will only insert one at a time. You could use list.sliding to batch the // inserts if that was important. val temp1 = list.foldLeft(emptyFuture) { (previousFuture, trip) => previousFuture flatMap { previous => // Inner code copied from your example. val data = for { tripData <- TripDataRepository.insertQuery(trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) } yield ((tripData, subTripData)) val res = db.run(data.transactionally) previous :+ res } } 
+1
source

Source: https://habr.com/ru/post/988893/


All Articles