I am building an application using Kafka and Spark Streaming. The input comes from the streaming of the third part and is published on the topic of Kafka. This code shows the Stream Proxy module: how I get the results from streaming and how I send them to KafkaPublisher (it is shown only in a sketch):
def on_result_response(self,*args):
self.kafkaPublisher.pushMessage(str(args[0]))
KafkaPublisher is implemented using these two methods:
class KafkaPublisher:
def __init__(self,address,port,topic):
self.kafka = KafkaClient(str(address)+":"+str(port))
self.producer = SimpleProducer(self.kafka)
self.topic=topic
def pushMessage(self,message):
self.producer.send_messages(self.topic, message)
self.producer = SimpleProducer(self.kafka, async=True)
And the application starts basically:
from StreamProxy import StreamProxy
streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)
After some batch processing (more than 10 seconds), she threw the following exceptions :
Thread-2354: Traceback ( ): "/usr/lib/python2.7/threading.py", 801, __bootstrap_inner "/usr/lib/python2.7/threading.py", 754, "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", 164, _send_upstream "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 649, send_produce_request "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 253, _send_broker_aware_request "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 74, _get_conn "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", 236, error: [Errno 24]
Thread-2355: Traceback ( ): "/usr/lib/python2.7/threading.py", 801, __bootstrap_inner "/usr/lib/python2.7/threading.py", 754, "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", 164, _send_upstream "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 649, send_produce_request "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 253, _send_broker_aware_request "/usr/local/lib/python2.7/dist-packages/kafka/client.py" , 74, _get_conn "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", 236, error: [Errno 24]
, , , , .