Commit 1eea1b4f authored by Christopher Reis's avatar Christopher Reis

Added topic wildcarding (buggy)

parent 59133c0a
...@@ -22,7 +22,7 @@ LOCAL_INCS = -I$(GMSEC_HOME)/include \ ...@@ -22,7 +22,7 @@ LOCAL_INCS = -I$(GMSEC_HOME)/include \
-I$(SUPDIR)/include \ -I$(SUPDIR)/include \
-I./include \ -I./include \
CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS) CXXFLAGS += -std=gnu++11 $(LOCAL_DEFS) $(LOCAL_INCS)
LDFLAGS += -L$(SUPDIR)/include -L/usr/local/lib -lrdkafka++ -lrdkafka -lpthread -lrt -lz LDFLAGS += -L$(SUPDIR)/include -L/usr/local/lib -lrdkafka++ -lrdkafka -lpthread -lrt -lz
# #
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <cstdio> #include <cstdio>
#include <csignal> #include <csignal>
#include <cstring> #include <cstring>
#include <regex>
#include <gmsec_version.h> #include <gmsec_version.h>
...@@ -96,6 +97,30 @@ void msg_consume(RdKafka::Message* message, void* opaque) { ...@@ -96,6 +97,30 @@ void msg_consume(RdKafka::Message* message, void* opaque) {
} }
} }
std::vector<std::string> list_topics(std::string input_topic){
std::vector<std::string> str_vec;
RdKafka::Topic *topic = NULL;
class RdKafka::Metadata *metadata;
/* Fetch metadata */
RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err) << std::endl;
}
std::cout << metadata->topics()->size() << " topics:" << std::endl;
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
std::string t = (*it)->topic();
std::regex re(input_topic);
if (regex_match(t,re)) {
str_vec.push_back(t);
std::cout << "Subscribing: " << t << '\n';
}
}
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public: public:
void dr_cb (RdKafka::Message &message) { void dr_cb (RdKafka::Message &message) {
...@@ -196,14 +221,41 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -196,14 +221,41 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')' << '\n'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')' << '\n';
std::string topic_str = subject; std::string topic_str = subject;
std::vector<std::string> topic_vec;
if (topic_str.find("*") != string::npos) {
topic_vec = list_topics(topic_str);
for (vector<string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) {
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr); sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
RdKafka::Consumer *t_consumer = RdKafka::Consumer::create(sub_conf, mw_errstr); RdKafka::Consumer *t_consumer = RdKafka::Consumer::create(sub_conf, mw_errstr);
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(t_consumer, *i, tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
/*
* Start consumer for topic+partition at start offset
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = t_consumer->start(topic, partition, start_offset, rkqu);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
}
} else {
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
RdKafka::Consumer *t_consumer = RdKafka::Consumer::create(sub_conf, mw_errstr);
/* /*
* Create topic handle. * Create topic handle.
*/ */
...@@ -212,46 +264,20 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -212,46 +264,20 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
std::cerr << "Failed to create topic: " << mw_errstr << std::endl; std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1); exit(1);
} }
/* /*
* Start consumer for topic+partition at start offset * Start consumer for topic+partition at start offset
*/ */
int32_t partition = 0; int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = t_consumer->start(topic, partition, start_offset, rkqu); RdKafka::ErrorCode resp = t_consumer->start(topic, partition, start_offset, rkqu);
GMSEC_DEBUG << "TEST" << '\n';
if (resp != RdKafka::ERR_NO_ERROR) { if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl; RdKafka::err2str(resp) << std::endl;
exit(1); exit(1);
} }
// ExampleConsumeCb ex_consume_cb; }
//
// /*
// * Consume messages
// */
// while (run) {
// if (use_ccb) {
// consumer->consume_callback(topic, partition, 1000,
// &ex_consume_cb, &use_ccb);
// } else {
// RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
// msg_consume(msg, NULL);
// delete msg;
// }
// consumer->poll(0);
// }
/*
* Stop consumer
*/
// consumer->stop(topic, partition);
//
// consumer->poll(1000);
//
// delete topic;
// delete consumer;
} }
void KafkaConnection::mwUnsubscribe(const char *subject) void KafkaConnection::mwUnsubscribe(const char *subject)
...@@ -310,8 +336,17 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout) ...@@ -310,8 +336,17 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
{ {
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive"; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive";
message = NULL;
RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout); RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout);
msg_consume(msg, NULL);
GMSEC_DEBUG << '\n' << static_cast<const char *>(msg->payload())<< '\n';
Message recv_msg(static_cast<const char *>(msg->payload()));
//Message recv_msg =static_cast<const char *>(msg->payload())
//msg_consume(msg, NULL);
delete msg; delete msg;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment