Too many files open application error with Kafka

I am building an application using Kafka and Spark Streaming. The input comes from the streaming of the third part and is published on the topic of Kafka. This code shows the Stream Proxy module: how I get the results from streaming and how I send them to KafkaPublisher (it is shown only in a sketch):

def on_result_response(self,*args):
    self.kafkaPublisher.pushMessage(str(args[0]))

KafkaPublisher is implemented using these two methods:

class KafkaPublisher:

def __init__(self,address,port,topic):
    self.kafka = KafkaClient(str(address)+":"+str(port))
    self.producer = SimpleProducer(self.kafka)
    self.topic=topic



def pushMessage(self,message):
    self.producer.send_messages(self.topic, message)
    self.producer = SimpleProducer(self.kafka, async=True)

And the application starts basically:

from StreamProxy import StreamProxy


streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)  #seconds of streaming

After some batch processing (more than 10 seconds), she threw the following exceptions :

Thread-2354:   Traceback ( ):      "/usr/lib/python2.7/threading.py", 801, __bootstrap_inner      "/usr/lib/python2.7/threading.py", 754,      "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", 164, _send_upstream      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 649, send_produce_request      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 253, _send_broker_aware_request      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 74, _get_conn      "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", 236,   error: [Errno 24]

Thread-2355:   Traceback ( ):      "/usr/lib/python2.7/threading.py", 801, __bootstrap_inner      "/usr/lib/python2.7/threading.py", 754,      "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", 164, _send_upstream      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 649, send_produce_request      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 253, _send_broker_aware_request      "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 74, _get_conn      "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", 236,   error: [Errno 24]

, , , , .

+4
1

:

self.producer = SimpleProducer(self.kafka, async=True)
+2

Source: https://habr.com/ru/post/1680884/


All Articles