Commit 5f9040be authored by Christopher Reis's avatar Christopher Reis

Added error detection in Recieve

parent 34bb81bc
...@@ -330,10 +330,10 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config) ...@@ -330,10 +330,10 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config)
producer->poll(0); producer->poll(0);
} }
while (run && producer->outq_len() > 0) { // while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl; // std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(250); // producer->poll(250);
} // }
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << '\n' << message.toXML() ; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << '\n' << message.toXML() ;
} }
...@@ -351,7 +351,7 @@ void KafkaConnection::mwReply(const Message& request, const Message& reply) ...@@ -351,7 +351,7 @@ void KafkaConnection::mwReply(const Message& request, const Message& reply)
void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout) void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
{ {
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive"; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive";
static std::string MSG_END = "</MESSAGE>";
message = NULL; message = NULL;
RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout); RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout);
...@@ -360,9 +360,14 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout) ...@@ -360,9 +360,14 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
if(msg_str.find("Broker:") != string::npos) { if(msg_str.find("Broker:") != string::npos) {
GMSEC_INFO << msg_str.c_str(); GMSEC_INFO << msg_str.c_str();
} else { } else {
GMSEC_INFO << '\n' << msg_str.c_str(); //Error Checking
std::size_t found = msg_str.find(MSG_END);
if((found + MSG_END.length()) != msg_str.length()) {
GMSEC_DEBUG << "We have issues:" << msg_str.substr(found.c_str());
} else {
message = new Message(msg_str.c_str()); message = new Message(msg_str.c_str());
} }
}
//msg_consume(msg, NULL); //msg_consume(msg, NULL);
delete msg; delete msg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment