Commit f89f01e0 authored by Christopher Reis's avatar Christopher Reis

Unsubscribe to wildcard added

parent 0f0cff91
...@@ -59,6 +59,9 @@ RdKafka::Consumer *consumer; ...@@ -59,6 +59,9 @@ RdKafka::Consumer *consumer;
RdKafka::Queue *rkqu; RdKafka::Queue *rkqu;
//Kafka vector objects //Kafka vector objects
//Vector variables
std::vector<std::string> subscribed_topics;
static void sigterm (int sig) { static void sigterm (int sig) {
run = false; run = false;
} }
...@@ -233,6 +236,7 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -233,6 +236,7 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) { for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) {
std::string it_str = *i; std::string it_str = *i;
GMSEC_DEBUG << "Subscribing to: " << it_str.c_str() << '\n'; GMSEC_DEBUG << "Subscribing to: " << it_str.c_str() << '\n';
subscribed_topics.push_back(it_str.c_str());
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr); sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
...@@ -258,7 +262,8 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -258,7 +262,8 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
} }
} }
} else { } else {
GMSEC_DEBUG << "No Wildcard" << '\n'; GMSEC_DEBUG << "No Wildcard; Subscribing to: " << subject << '\n';
subscribed_topics.push_back(subject);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr); sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
...@@ -290,6 +295,15 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -290,6 +295,15 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
void KafkaConnection::mwUnsubscribe(const char *subject) void KafkaConnection::mwUnsubscribe(const char *subject)
{ {
std::string topic_str = subject;
if (topic_str.find("*") != string::npos) {
GMSEC_DEBUG << "Found Wildcard in " << subject << '\n';
std::vector<std::string> topic_vec = list_topics(topic_str);
for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) {
std::string it_str = *i;
GMSEC_DEBUG << "Unsubscribing to: " << it_str.c_str() << '\n';
}
}
int32_t partition = 0; int32_t partition = 0;
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')';
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment