Evacuation sequences displayed in socket messages

I work with two files, a generator (written in Python) that generates data, and a C-based socket server that needs to listen and process the data that the generator sends to it.

The generator creates message batches where each message is separated by the \ n character. Thus, an example batch: x0x foo.bar.baz 45 1429694987 \nx1x foo.bar.baz 45 1429694987 \n In my juice, I read 1000 bytes to the buffer. I want to split a general message into separate lines, separated by a separator \n. I did this, but I am having problems when the message exceeds the length of 1000 bytes, which I mentioned earlier. If the message is not completed, that is, it does not end with \n, I want to save any part of the message I received, and then use it to restore the full message when the next 1000 bytes arrive.

What happens when a batch length exceeds a buffer length of 1000 bytes is that some random character seems to be entered. An example output (copied from the terminal) is shown below:

x0x foo.bar.baz 45 1429694987 
x1x foo.bar.baz 45 1429694987
x2x foo.bar.baz 45 1429694987
x3x foo.bar.baz 45 1429694987
x4x foo.bar.baz 45 1429694987
x5x foo.bar.baz 45 1429694987
x6x foo.bar.baz 45 1429694987
x7x foo.bar.baz 45 1429694987
x8x foo.bar.baz 45 1429694987
x9x foo.bar.baz 45 1429694987
x10x foo.bar.baz 45 1429694987
x11x foo.bar.baz 45 1429694987
x12x foo.bar.baz 45 1429694987
x13x foo.bar.baz 45 1429694987
x14x foo.bar.baz 45 1429694987
x15x foo.bar.baz 45 1429694987
x16x foo.bar.baz 45 1429694987
x17x foo.bar.baz 45 1429694987
x18x foo.bar.baz 45 1429694987
x19x foo.bar.baz 45 1429694987
x20x foo.bar.baz 45 1429694987
x21x foo.bar.baz 45 1429694987
x22x foo.bar.baz 45 1429694987
x23x foo.bar.baz 45 1429694987
x24x foo.bar.baz 45 1429694987
x25x foo.bar.baz 45 1429694987
x26x foo.bar.baz 45 1429694987
x27x foo.bar.baz 45 1429694987
x28x foo.bar.baz 45 1429694987
x29x foo.bar.baz 45 1429694987
x30x foo.bar.baz 45 1429694987
x31x foo.bar.baz 4▒▒▒p

, , - , , . , . C : ( n00b!)

#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <netdb.h>

#include "../librdkafka/src/rdkafka.h"

static const int PORT = 3135;

 //delta between these two allows for prefixing of cut off data.
static const int BUFFER_SIZE = 1000;
static const int FULL_MSG_SIZE = 1200;
static const char *BROKERS = "192.168.50.11:9092";
static const char *TOPIC = "test";

/**
 * Socket error handling
 */
void error(const char *msg){
    perror(msg);
    exit(1);
}

/**
 * Kafka logger calback
 */
static void logger (const rd_kafka_t *rk, int level,
                    const char *fac, const char *buf) {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
                (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
                level, fac, rd_kafka_name(rk), buf);
}

/**
 * Creates thread for each incoming connection and pushes data to Kafka.
 */
void *streamHandler(void *pnewsock){
    int number = * (int *) pnewsock;
    printf("Starting thread %d\n", number);
    /* Configuration */
    rd_kafka_topic_t *rkt;
    rd_kafka_t *rk;
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    int partition = RD_KAFKA_PARTITION_UA;
    int quiet = 0;
    char errstr[512];

    /* Socket config */
    char buffer[BUFFER_SIZE];
    bzero(buffer, BUFFER_SIZE);
    int msg;

/* initialize kafka conf variables */
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr)))) {
        fprintf(stderr,
                "%% Failed to create new producer: %s\n",
                errstr);
        exit(1);
}

/* Set logger */
rd_kafka_set_logger(rk, logger);
rd_kafka_set_log_level(rk, LOG_DEBUG);

/* Add brokers */
if (rd_kafka_brokers_add(rk, BROKERS) == 0) {
        fprintf(stderr, "%% No valid brokers specified\n");
        exit(1);
}

/* Create topic */
rkt = rd_kafka_topic_new(rk, TOPIC, topic_conf);

strcpy(buffer, "");
char *last_msg = "";
char full_msg[FULL_MSG_SIZE] ;
bzero(full_msg, FULL_MSG_SIZE);
char * pch;
char delimiter[2] = "\n";
char *final;
int last_msg_complete = 0;
int res = 0;
while(1){
    if (recv(number, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0){
        // connection has closed, so kill thread
        break;
    }

    msg = read(number, buffer, BUFFER_SIZE-1);
    if(msg < 0){
            error("ERROR reading from socket");
            break;
    }
    // prefix the last message (empty if no partial message from previous   transmit)
    printf("Concatenating now %s || %s", last_msg, buffer);
    strcat(full_msg, last_msg);
    strcat(full_msg, buffer);
    final = &full_msg[(strlen(full_msg)-1)];

    last_msg_complete = strcmp(delimiter, final); // 0 if ends in \n, other value otherwise

    // consume the received data and send to kafka.
    pch = strtok(full_msg, "\n");
    while(pch != NULL){            last_msg = '\0';
        last_msg = strdup(pch);

        pch = strtok(NULL, "\n");
        if ((pch != NULL) || (last_msg_complete == 0)){
                // only send this message if it isn't the last one
                // OR if it is the last one, only if it ends in \n
                res = rd_kafka_produce(rkt, partition,
                                RD_KAFKA_MSG_F_COPY,
                                last_msg, strlen(last_msg),
                                NULL, 0,
                                NULL);
                if (res == -1){
                        fprintf(stderr,
                         "%% Failed to produce to topic %s "
                        "partition %i:%s \n",
                        rd_kafka_topic_name(rkt), partition,
                        rd_kafka_err2str(rd_kafka_errno2err(errno)));
                }
                if (!quiet){
                    printf("full_msg %s\n", full_msg);
                    printf("buffer %s\n", buffer);
                    printf("last_msg %s\n", last_msg);
                }
                rd_kafka_poll(rk, 0);
                free(last_msg);
        }
    }
    free(pch);
    // empty the buffer and full_msg.
    bzero(buffer, BUFFER_SIZE);
    bzero(full_msg, FULL_MSG_SIZE);
}
printf("Killed thread connection %d\n", number);

/* Destroy topic */
rd_kafka_topic_destroy(rkt);

/* Destroy the handle */
rd_kafka_destroy(rk);
pthread_exit(&number);
return NULL;
}


int main(){
    int sock, newsock;
    pthread_t thread;
    int reuseaddr = 1;

struct sockaddr_in serv_addr;
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(PORT);

    // create the listening socket
    sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == -1){
            perror("socket creation issue");
            return 1;
    }

    // enable the socket to reuse the address
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1){
            perror("socket address reuse");
            return 1;
    }

    // bind to address
    if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0){
            perror("bind error");
            return 1;
    }

    // Listen for new connections.
    listen(sock, 5);

    // When new connections arrive, create a thread with a socket object
    while(1){
            struct sockaddr_in their_addr;
            socklen_t size = sizeof(their_addr);
            puts("waiting");
            newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
            puts("done waiting");

            if(newsock == -1){
                    perror("main loop accept error");
            } else{
                    printf("Connected to %s on port %d\n", inet_ntoa(their_addr.sin_addr), their_addr.sin_port);
                    if(pthread_create(&thread, NULL, streamHandler, &newsock) != 0){
                            fprintf(stderr, "Could not create thread \n");
                    }
            }
    }

    close(sock);

    return 0;

}

+4
1

, read() , ? recv() MSG_PEEK , , , - , , read(), , read() .

, recv() MSG_PEEK .

+4

Source: https://habr.com/ru/post/1583975/


All Articles