Commit cc364bd8 authored by Christopher Reis's avatar Christopher Reis

added Kafka Library

parent 78910c07
...@@ -14,9 +14,13 @@ TARGET = lib$(LIBROOTNAME).$(SHLIB_EXT) ...@@ -14,9 +14,13 @@ TARGET = lib$(LIBROOTNAME).$(SHLIB_EXT)
OBJDIR = ./src OBJDIR = ./src
SUPDIR = $(SUPPORT)/kafka
LOCAL_DEFS = -DGMSEC_LIBROOTNAME=$(LIBROOTNAME) LOCAL_DEFS = -DGMSEC_LIBROOTNAME=$(LIBROOTNAME)
LOCAL_INCS = -I./include LOCAL_INCS = -I$(GMSEC_HOME)/include \
-I$(SUPDIR)/include \
-I./include
CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS) CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS)
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include <gmsec_version.h> #include <gmsec_version.h>
#include <rdkafka.h>
using namespace gmsec::api; using namespace gmsec::api;
using namespace gmsec::api::internal; using namespace gmsec::api::internal;
...@@ -40,6 +41,23 @@ using namespace gmsec::api::util; ...@@ -40,6 +41,23 @@ using namespace gmsec::api::util;
using namespace std; using namespace std;
/**
* Message delivery report callback.
* Called once for each message.
* See rdkafka.h for more information.
*/
static void msg_delivered (rd_kafka_t *rk,
void *payload, size_t len,
int error_code,
void *opaque, void *msg_opaque) {
if (error_code)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(error_code));
else if (!quiet)
fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
(int)len, (const char *)payload);
}
KafkaConnection::KafkaConnection(const Config& config) KafkaConnection::KafkaConnection(const Config& config)
: :
...@@ -47,6 +65,7 @@ KafkaConnection::KafkaConnection(const Config& config) ...@@ -47,6 +65,7 @@ KafkaConnection::KafkaConnection(const Config& config)
mwInfo("") mwInfo("")
{ {
GMSEC_DEBUG << "Connection" << '\n'; GMSEC_DEBUG << "Connection" << '\n';
} }
KafkaConnection::~KafkaConnection() KafkaConnection::~KafkaConnection()
...@@ -91,7 +110,7 @@ void KafkaConnection::mwUnsubscribe(const char *subject) ...@@ -91,7 +110,7 @@ void KafkaConnection::mwUnsubscribe(const char *subject)
void KafkaConnection::mwPublish(const Message& message, const Config& config) void KafkaConnection::mwPublish(const Message& message, const Config& config)
{ {
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)"; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << message.toXML() ;
} }
void KafkaConnection::mwRequest(const Message& message, std::string& id) void KafkaConnection::mwRequest(const Message& message, std::string& id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment