Commit 34bb81bc authored by Christopher Reis's avatar Christopher Reis

Added receive function

parent 81e3b29a
......@@ -52,9 +52,12 @@ using namespace std;
static bool run = true;
static bool exit_eof = false;
int use_ccb = 0;
//Kafka base objects
RdKafka::Producer *producer;
RdKafka::Consumer *consumer;
RdKafka::Queue *rkqu;
//Kafka vector objects
static void sigterm (int sig) {
run = false;
......@@ -119,6 +122,7 @@ std::vector<std::string> list_topics(std::string input_topic){
}
}
return str_vec;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
......@@ -227,7 +231,7 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
GMSEC_DEBUG << "Found Wildcard in " << topic_str.c_str() << '\n';
topic_vec = list_topics(topic_str);
for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) {
std::string it_str = *i; //ERROR WITH ITERATOR
std::string it_str = *i;
GMSEC_DEBUG << "Subscribing to: " << it_str.c_str() << '\n';
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
......@@ -236,7 +240,7 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(t_consumer, *i, tconf, mw_errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, it_str, tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
......@@ -246,13 +250,12 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = t_consumer->start(topic, partition, start_offset, rkqu);
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset, rkqu);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
}
} else {
GMSEC_DEBUG << "No Wildcard" << '\n';
......@@ -280,13 +283,22 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
exit(1);
}
}
}
void KafkaConnection::mwUnsubscribe(const char *subject)
{
int32_t partition = 0;
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')';
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, subject, tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
consumer->stop(topic, partition);
}
void KafkaConnection::mwPublish(const Message& message, const Config& config)
......@@ -328,7 +340,7 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config)
void KafkaConnection::mwRequest(const Message& message, std::string& id)
{
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest("<< ')';
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest(" << ')';
}
void KafkaConnection::mwReply(const Message& request, const Message& reply)
......@@ -343,12 +355,14 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
message = NULL;
RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout);
std::string msg_str = static_cast<const char *>(msg->payload());
GMSEC_DEBUG << '\n' << static_cast<const char *>(msg->payload())<< '\n';
Message recv_msg(static_cast<const char *>(msg->payload()));
//Message recv_msg =static_cast<const char *>(msg->payload())
if(msg_str.find("Broker:") != string::npos) {
GMSEC_INFO << msg_str.c_str();
} else {
GMSEC_INFO << '\n' << msg_str.c_str();
message = new Message(msg_str.c_str());
}
//msg_consume(msg, NULL);
delete msg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment