Python Postgres psycopg2 ThreadedConnectionPool exhausted

I have considered several issues related to “too many clients”, but still can’t solve my problem, so I have to ask again about this, for me a specific case.

Basically, I set up my local Postgres server and have to execute tens of thousands of requests, so I used python psycopg2package. Here are my codes:

import psycopg2 import pandas as pd import numpy as np from flashtext import KeywordProcessor from psycopg2.pool import ThreadedConnectionPool from concurrent.futures import ThreadPoolExecutor df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']}) # df = pd.concat([df]*10000) # repeat df 10000 times DSN = "postgresql://User: password@localhost /db" tcp = ThreadedConnectionPool(1, 800, DSN) def do_one_query(inputS, inputT): conn = tcp.getconn() c = conn.cursor() q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;" c.execute(q) all_results = c.fetchall() for row in all_results: return row tcp.putconn(conn, close=True) cnt=0 for idx, row in df.iterrows(): cnt+=1 with ThreadPoolExecutor(max_workers=1) as pool: ret = pool.submit(do_one_query, row["S"], row["T"]) print ret.result() print cnt 

The code works well with a little df. If I repeat df 10,000 times, I get an error message saying that the connection pool is exhausted, I, although the connections that I used were closed by this line:

tcp.putconn (conn, close = True) But aren't they actually closed? How can I get around this problem?

+4
source share
3 answers

You need to use a queue on top of your pool.

Something like the following should work:

 import gevent, sys, random, psycopg2, logging from contextlib import contextmanager from gevent.queue import Queue from gevent.socket import wait_read, wait_write from psycopg2.pool import ThreadedConnectionPool from psycopg2 import extensions, OperationalError import sys logger = logging.getLogger(__name__) poolsize = 100 #number of max connections pdsn = '' # put your dsn here if sys.version_info[0] >= 3: integer_types = (int,) else: import __builtin__ integer_types = (int, __builtin__.long) class ConnectorError(Exception): """ This is a base class for all CONNECTOR related exceptions """ pass #singleton connection pool, gets reset if a connection is bad or drops _pgpool = None def pgpool(): global _pgpool if not _pgpool: try: _pgpool = PostgresConnectionPool(maxsize=poolsize) except psycopg2.OperationalError as exc: _pgpool = None return _pgpool class Pcursor(object): def __init__(self, **kwargs): #in case of a lost connection lets sit and wait till it online global _pgpool if not _pgpool: while not _pgpool: try: pgpool() except: logger.debug('Attempting Connection To Postgres...') gevent.sleep(1) def fetchone(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) logger.debug(cursor.query) return cursor.fetchone() def fetchall(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) logger.debug(cursor.query) return cursor.fetchall() def execute(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) finally: logger.debug(cursor.query) return cursor.query def fetchmany(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) while 1: items = cursor.fetchmany() if not items: break for item in items: yield item class AbstractDatabaseConnectionPool(object): def __init__(self, maxsize=poolsize): if not isinstance(maxsize, integer_types): raise TypeError('Expected integer, got %r' % (maxsize, )) self.maxsize = maxsize self.pool = Queue() self.size = 0 def create_connection(self): #overridden by PostgresConnectionPool raise NotImplementedError() def get(self): pool = self.pool if self.size >= self.maxsize or pool.qsize(): return pool.get() self.size += 1 try: new_item = self.create_connection() except: self.size -= 1 raise return new_item def put(self, item): self.pool.put(item) def closeall(self): while not self.pool.empty(): conn = self.pool.get_nowait() try: conn.close() except Exception: pass @contextmanager def connection(self, isolation_level=None): conn = self.get() try: if isolation_level is not None: if conn.isolation_level == isolation_level: isolation_level = None else: conn.set_isolation_level(isolation_level) yield conn except: if conn.closed: conn = None self.closeall() raise else: if conn.closed: raise OperationalError("Cannot commit because connection was closed: %r" % (conn, )) finally: if conn is not None and not conn.closed: if isolation_level is not None: conn.set_isolation_level(isolation_level) self.put(conn) @contextmanager def cursor(self, *args, **kwargs): isolation_level = kwargs.pop('isolation_level', None) with self.connection(isolation_level) as conn: try: yield conn.cursor(*args, **kwargs) except: global _pgpool _pgpool = None del(self) class PostgresConnectionPool(AbstractDatabaseConnectionPool): def __init__(self,**kwargs): try: self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn) except: global _pgpool _pgpool = None raise ConnectorError('Database Connection Failed') maxsize = kwargs.pop('maxsize', None) self.kwargs = kwargs AbstractDatabaseConnectionPool.__init__(self, maxsize) def create_connection(self): self.conn = self.pconnect.getconn() self.conn.autocommit = True return self.conn def gevent_wait_callback(conn, timeout=None): """A wait callback useful to allow gevent to work with Psycopg.""" while 1: state = conn.poll() if state == extensions.POLL_OK: break elif state == extensions.POLL_READ: wait_read(conn.fileno(), timeout=timeout) elif state == extensions.POLL_WRITE: wait_write(conn.fileno(), timeout=timeout) else: raise ConnectorError("Bad result from poll: %r" % state) extensions.set_wait_callback(gevent_wait_callback) 

Then you can call through this connection:

 import db db.Pcursor().execute(PSQL, arg1, arg2, arg3) 

Essentially, I borrowed the gevent async postgres example and modified it to support thread merging through pyscopg2.

https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py

I added that psycogreen does inside the module, so all you have to do is import and call the class. Each class call queues a new request, but uses a pool of only a certain size. This way, your connection does not end. Essentially, this is similar to what PGBouncer does, which I think will fix your issue as well.

https://pgbouncer.imtqy.com/

+8
source

I struggled to find really detailed information on how ThreadedConnectionPool works. https://bbengfort.imtqy.com/observations/2017/12/06/psycopg2-transactions.html is not bad, but it turns out that the statement that getconn blocks until the connection becomes available is incorrect. When checking the code, all that ThreadedConnectionPool adds is locking the AbstractConnectionPool methods to prevent race conditions. If at any point an attempt is made to use more than maxconn-connections, the pool of connections, PoolError, will be exhausted.

If you want something a little simpler than the accepted answer , further wrapping the methods in a semaphore, providing a lock until the connection is available, should help:

 from psycopg2.pool import ThreadedConnectionPool from threading import Semaphore class ReallyThreadedConnectionPool(ThreadedConnectionPool): def __init__(self, minconn, maxconn, *args, **kwargs): self._semaphore = Semaphore(maxconn) super().__init__(minconn, maxconn, *args, **kwargs) def getconn(self, *args, **kwargs): self._semaphore.acquire() return super().getconn(*args, **kwargs) def putconn(self, *args, **kwargs): super().putconn(*args, **kwargs) self._semaphore.release() 
+6
source

Your problem here is that you do not actually return the connection to the pool, but close it forever

 tcp.putconn(conn, close=True) 

See the documentation here http://initd.org/psycopg/docs/pool.html.

 If close is True, discard the connection from the pool. 

So, if you put 800 connections in your pool, after the 801th cycle you will get an "exhausted error" because the size of your connection pool is zero.

+1
source

Source: https://habr.com/ru/post/1443699/


All Articles