Kafka consumer obsolescence errors for streaming to twitter

I am working on streaming data from the Kafka twitter channel.

I follow the sample from the link below: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

I can use Producer code and it works fine. Possibility to get twitter and send to Kafka Producer.

I cannot use the consumer code as it was thrown as an obsolete error for many APIs.

Here is the consumer code:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

It throws an error: ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.

ConsumerConfig is not displayed.

Is there a fixed version of this sample code (Kafka consumer for twitter)?

+4
source share
1 answer

, , , Scala Kafka, , . http://kafka.apache.org/documentation/#legacyapis

, , :

  • kafka.consumer.* kafka.javaapi.consumer Java- org.apache.kafka.clients.consumer.*

  • kafka.producer.* kafka.javaapi.producer Java- org.apache.kafka.clients.producer.*

, , . . . , , twittertopic.

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

, Kafka, , , , , .

Java-, , Github: https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

Java-, :

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(1000L);
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}
+1

Source: https://habr.com/ru/post/1694362/