I work with two files, a generator (written in Python) that generates data, and a C-based socket server that needs to listen and process the data that the generator sends to it.
The generator creates message batches where each message is separated by the \ n character. Thus, an example batch:
x0x foo.bar.baz 45 1429694987 \nx1x foo.bar.baz 45 1429694987 \n
In my juice, I read 1000 bytes to the buffer. I want to split a general message into separate lines, separated by a separator \n. I did this, but I am having problems when the message exceeds the length of 1000 bytes, which I mentioned earlier. If the message is not completed, that is, it does not end with \n, I want to save any part of the message I received, and then use it to restore the full message when the next 1000 bytes arrive.
What happens when a batch length exceeds a buffer length of 1000 bytes is that some random character seems to be entered. An example output (copied from the terminal) is shown below:
x0x foo.bar.baz 45 1429694987
x1x foo.bar.baz 45 1429694987
x2x foo.bar.baz 45 1429694987
x3x foo.bar.baz 45 1429694987
x4x foo.bar.baz 45 1429694987
x5x foo.bar.baz 45 1429694987
x6x foo.bar.baz 45 1429694987
x7x foo.bar.baz 45 1429694987
x8x foo.bar.baz 45 1429694987
x9x foo.bar.baz 45 1429694987
x10x foo.bar.baz 45 1429694987
x11x foo.bar.baz 45 1429694987
x12x foo.bar.baz 45 1429694987
x13x foo.bar.baz 45 1429694987
x14x foo.bar.baz 45 1429694987
x15x foo.bar.baz 45 1429694987
x16x foo.bar.baz 45 1429694987
x17x foo.bar.baz 45 1429694987
x18x foo.bar.baz 45 1429694987
x19x foo.bar.baz 45 1429694987
x20x foo.bar.baz 45 1429694987
x21x foo.bar.baz 45 1429694987
x22x foo.bar.baz 45 1429694987
x23x foo.bar.baz 45 1429694987
x24x foo.bar.baz 45 1429694987
x25x foo.bar.baz 45 1429694987
x26x foo.bar.baz 45 1429694987
x27x foo.bar.baz 45 1429694987
x28x foo.bar.baz 45 1429694987
x29x foo.bar.baz 45 1429694987
x30x foo.bar.baz 45 1429694987
x31x foo.bar.baz 4▒▒▒p▒
, , - , , . , . C : ( n00b!)
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <netdb.h>
#include "../librdkafka/src/rdkafka.h"
static const int PORT = 3135;
static const int BUFFER_SIZE = 1000;
static const int FULL_MSG_SIZE = 1200;
static const char *BROKERS = "192.168.50.11:9092";
static const char *TOPIC = "test";
void error(const char *msg){
perror(msg);
exit(1);
}
static void logger (const rd_kafka_t *rk, int level,
const char *fac, const char *buf) {
struct timeval tv;
gettimeofday(&tv, NULL);
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
level, fac, rd_kafka_name(rk), buf);
}
void *streamHandler(void *pnewsock){
int number = * (int *) pnewsock;
printf("Starting thread %d\n", number);
rd_kafka_topic_t *rkt;
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
int partition = RD_KAFKA_PARTITION_UA;
int quiet = 0;
char errstr[512];
char buffer[BUFFER_SIZE];
bzero(buffer, BUFFER_SIZE);
int msg;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
rd_kafka_set_logger(rk, logger);
rd_kafka_set_log_level(rk, LOG_DEBUG);
if (rd_kafka_brokers_add(rk, BROKERS) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
rkt = rd_kafka_topic_new(rk, TOPIC, topic_conf);
strcpy(buffer, "");
char *last_msg = "";
char full_msg[FULL_MSG_SIZE] ;
bzero(full_msg, FULL_MSG_SIZE);
char * pch;
char delimiter[2] = "\n";
char *final;
int last_msg_complete = 0;
int res = 0;
while(1){
if (recv(number, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0){
break;
}
msg = read(number, buffer, BUFFER_SIZE-1);
if(msg < 0){
error("ERROR reading from socket");
break;
}
printf("Concatenating now %s || %s", last_msg, buffer);
strcat(full_msg, last_msg);
strcat(full_msg, buffer);
final = &full_msg[(strlen(full_msg)-1)];
last_msg_complete = strcmp(delimiter, final);
pch = strtok(full_msg, "\n");
while(pch != NULL){ last_msg = '\0';
last_msg = strdup(pch);
pch = strtok(NULL, "\n");
if ((pch != NULL) || (last_msg_complete == 0)){
res = rd_kafka_produce(rkt, partition,
RD_KAFKA_MSG_F_COPY,
last_msg, strlen(last_msg),
NULL, 0,
NULL);
if (res == -1){
fprintf(stderr,
"%% Failed to produce to topic %s "
"partition %i:%s \n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
}
if (!quiet){
printf("full_msg %s\n", full_msg);
printf("buffer %s\n", buffer);
printf("last_msg %s\n", last_msg);
}
rd_kafka_poll(rk, 0);
free(last_msg);
}
}
free(pch);
bzero(buffer, BUFFER_SIZE);
bzero(full_msg, FULL_MSG_SIZE);
}
printf("Killed thread connection %d\n", number);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
pthread_exit(&number);
return NULL;
}
int main(){
int sock, newsock;
pthread_t thread;
int reuseaddr = 1;
struct sockaddr_in serv_addr;
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(PORT);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1){
perror("socket creation issue");
return 1;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1){
perror("socket address reuse");
return 1;
}
if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0){
perror("bind error");
return 1;
}
listen(sock, 5);
while(1){
struct sockaddr_in their_addr;
socklen_t size = sizeof(their_addr);
puts("waiting");
newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
puts("done waiting");
if(newsock == -1){
perror("main loop accept error");
} else{
printf("Connected to %s on port %d\n", inet_ntoa(their_addr.sin_addr), their_addr.sin_port);
if(pthread_create(&thread, NULL, streamHandler, &newsock) != 0){
fprintf(stderr, "Could not create thread \n");
}
}
}
close(sock);
return 0;
}