Commit 08ba272c authored by Christopher Reis's avatar Christopher Reis

Added Request function

parent 60d15b65
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <gmsec4/internal/InternalConnection.h> #include <gmsec4/internal/InternalConnection.h>
#include <gmsec4/internal/MessageBuddy.h> #include <gmsec4/internal/MessageBuddy.h>
#include <gmsec4/internal/Rawbuf.h> #include <gmsec4/internal/Rawbuf.h>
#include <gmsec4/internal/SystemUtil.h>
#include <gmsec4/Connection.h> #include <gmsec4/Connection.h>
#include <gmsec4/Errors.h> #include <gmsec4/Errors.h>
...@@ -43,6 +44,9 @@ using namespace gmsec::api::util; ...@@ -43,6 +44,9 @@ using namespace gmsec::api::util;
using namespace std; using namespace std;
// Constants
#define TOPIC_PREFIX ""
static bool run = true; static bool run = true;
static bool exit_eof = false; static bool exit_eof = false;
int use_ccb = 0; int use_ccb = 0;
...@@ -60,9 +64,9 @@ static void sigterm (int sig) { ...@@ -60,9 +64,9 @@ static void sigterm (int sig) {
run = false; run = false;
} }
const char msg_clean(){ const char* msg_clean(){
//TODO clean messages to prevent program crashes //TODO clean messages to prevent program crashes
return std::string("Test"); return std::string("Test").c_str();
} }
void msg_consume(RdKafka::Message* message, void* opaque) { void msg_consume(RdKafka::Message* message, void* opaque) {
...@@ -145,8 +149,19 @@ class ExampleConsumeCb : public RdKafka::ConsumeCb { ...@@ -145,8 +149,19 @@ class ExampleConsumeCb : public RdKafka::ConsumeCb {
} }
}; };
std::string KafkaConnection::generateUniqueId(long id)
{
std::ostringstream strm;
strm << getExternal().getID() << "_" << SystemUtil::getProcessID() << "_" << ++uniquecounter << "_" << id;
std::string topic = TOPIC_PREFIX;
topic.append(strm.str());
return topic;
}
KafkaConnection::KafkaConnection(const Config& config) KafkaConnection::KafkaConnection(const Config& config)
: :
requestCounter(0),
uniquecounter(0),
mw_test(false), mw_test(false),
mwInfo(""), mwInfo(""),
mw_brokers("localhost"), mw_brokers("localhost"),
...@@ -292,17 +307,19 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config) ...@@ -292,17 +307,19 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
void KafkaConnection::mwUnsubscribe(const char *subject) void KafkaConnection::mwUnsubscribe(const char *subject)
{ {
int32_t partition = 0;
std::string topic_str = subject; std::string topic_str = subject;
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (topic_str.find("*") != string::npos) { if (topic_str.find("*") != string::npos) {
GMSEC_DEBUG << "Found Wildcard in " << subject << '\n'; GMSEC_DEBUG << "Found Wildcard in " << subject << '\n';
std::vector<std::string> topic_vec = list_topics(topic_str); std::vector<std::string> topic_vec = list_topics(topic_str);
for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) { for (std::vector<std::string>::const_iterator i = topic_vec.begin(); i != topic_vec.end(); ++i) {
std::string it_str = *i; std::string it_str = *i;
GMSEC_DEBUG << "Unsubscribing to: " << it_str.c_str() << '\n'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << it_str.c_str() << ')';
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, it_str, tconf, mw_errstr);
consumer->stop(topic, partition);
} }
} }
int32_t partition = 0;
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')';
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, subject, tconf, mw_errstr); RdKafka::Topic *topic = RdKafka::Topic::create(consumer, subject, tconf, mw_errstr);
if (!topic) { if (!topic) {
...@@ -349,9 +366,17 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config) ...@@ -349,9 +366,17 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config)
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << '\n' << message.toXML() ; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << '\n' << message.toXML() ;
} }
void KafkaConnection::mwRequest(const Message& message, std::string& id) void KafkaConnection::mwRequest(const Message& request, std::string& id)
{ {
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest(" << ')'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest(" << ')';
id = generateUniqueId(++requestCounter);
MessageBuddy::getInternal(request).addField(REPLY_UNIQUE_ID_FIELD, id.c_str());
MessageBuddy::getInternal(request).getDetails().setBoolean(OPT_REQ_RESP, true);
mwPublish(request, getExternal().getConfig());
GMSEC_DEBUG << "[Request sent successfully: " << request.getSubject() << "]";
} }
void KafkaConnection::mwReply(const Message& request, const Message& reply) void KafkaConnection::mwReply(const Message& request, const Message& reply)
...@@ -371,7 +396,7 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout) ...@@ -371,7 +396,7 @@ void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
if(msg_str.find("Broker:") != string::npos) { if(msg_str.find("Broker:") != string::npos) {
GMSEC_INFO << msg_str.c_str(); GMSEC_INFO << msg_str.c_str();
} else { } else {
GMSEC_INFO << '\n' << msg_str.c_str(); GMSEC_DEBUG << '\n' << msg_str.c_str();
message = new Message(msg_str.c_str()); message = new Message(msg_str.c_str());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment