Python: how to make fun of kafka theme for unit tests?

We have a message scheduler that generates a hash key from message attributes before queuing it to a Kafka key.

This is done to eliminate duplication. However, I'm not sure how I can test this deduplication without actually setting up the local cluster and verifying that it is performing as expected.

An Internet search of Kafka's queue bullying tools did not help, and I am worried that maybe I am thinking about it wrong.

Ultimately, everything that is used to bully the Kafka queue should behave just like a local cluster, i.e. provide de-debugging with key inserts in the order of topics.

Are there any such tools?

+6
source share
3 answers

If you need to test a specific kafka function or implementation using a special kakfa function, then the only way to do this is to use kakfa!

Does Kafka have any tests around his deduplication logic? If so, then a combination of the following may be enough to reduce the risks associated with your organization:

  • unit tests of your hash logic (make sure the same object actually generates the same hash)
  • kafka theme deduplication tests (internal for the kafka project)
  • pre-flight smoke tests checking the integration of your applications with kafka

kafka - , kafka. . . kafka (, ), . :

  • (-)
  • , CI kafka
  • , .

, , kafka , , kafka. , , , ~ 1 1 ? , , , , kakfa , .

+2

Kafka uder Python SBT, . Pyspark.

build.sbt , :

val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")

val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")

testPythonTask := {
  val s: TaskStreams = streams.value
  s.log.info("Executing task testPython")
  Process(command,
    workingDirectory,
    // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
    "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
      // collect all jar paths from project
      .format((fullClasspath in Runtime value)
      .map(_.data.getCanonicalPath)
        .filter(_.contains(".jar"))
        .mkString(",")),
    "PYSPARK_PYTHON" -> "python3") ! s.log
}

//attach custom test task to default test tasks
test in Test := {
  testPythonTask.value
  (test in Test).value
}

testOnly in Test := {
  testPythonTask.value
  (testOnly in Test).value
}

python (app_test.py):

import random
import unittest
from itertools import chain

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase

class KafkaStreamTests(PySparkStreamingTestCase):
    timeout = 20  # seconds
    duration = 1

    def setUp(self):
        super(KafkaStreamTests, self).setUp()

        kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
            .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
        self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
        self._kafkaTestUtils.setup()

    def tearDown(self):
        if self._kafkaTestUtils is not None:
            self._kafkaTestUtils.teardown()
            self._kafkaTestUtils = None

        super(KafkaStreamTests, self).tearDown()

    def _randomTopic(self):
        return "topic-%d" % random.randint(0, 10000)

    def _validateStreamResult(self, sendData, stream):
        result = {}
        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                   sum(sendData.values()))):
            result[i] = result.get(i, 0) + 1

        self.assertEqual(sendData, result)

    def test_kafka_stream(self):
        """Test the Python Kafka stream API."""
        topic = self._randomTopic()
        sendData = {"a": 3, "b": 5, "c": 10}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                         "test-streaming-consumer", {topic: 1},
                                         {"auto.offset.reset": "smallest"})
        self._validateStreamResult(sendData, stream)

Flume, Kinesis pyspark.streaming.tests.

0

You can find useful information on how to mock Kafka from the Kafka-Python libraries (scripts, lights, etc ...)

https://github.com/dpkp/kafka-python/blob/master/test/test_consumer.py

0
source

Source: https://habr.com/ru/post/1659390/


All Articles