Commit 0ca67dd6 authored by Christopher Reis's avatar Christopher Reis

Forgot to add connection.cpp

parent 061e3019
...@@ -40,6 +40,8 @@ ...@@ -40,6 +40,8 @@
#include <gmsec_version.h> #include <gmsec_version.h>
#include "rdkafkacpp.h"
//#include <librdkafka/rdkafkacpp.h>
using namespace gmsec::api; using namespace gmsec::api;
using namespace gmsec::api::internal; using namespace gmsec::api::internal;
...@@ -67,31 +69,31 @@ class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { ...@@ -67,31 +69,31 @@ class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
KafkaConnection::KafkaConnection(const Config& config) KafkaConnection::KafkaConnection(const Config& config)
: :
mw_test(false), mw_test(false),
mwInfo("") mwInfo(""),
mw_brokers("localhost"),
mw_errstr(""),
mw_debug("")
{ {
GMSEC_DEBUG << "Connection test" << '\n'; GMSEC_DEBUG << "Connection test" << '\n';
std::string brokers = "localhost";
std::string errstr;
std::string topic_str; std::string topic_str;
std::string mode;
std::string debug;
topic_str = "GMSEC.CONN.PUBLISH";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb; ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", brokers, errstr); conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, errstr); conf->set("dr_cb", &ex_dr_cb, mw_errstr);
/* /*
* Create producer using accumulated global configuration. * Create producer using accumulated global configuration.
*/ */
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); RdKafka::Producer *producer = RdKafka::Producer::create(conf, mw_errstr);
if (!producer) { if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl; std::cerr << "Failed to create producer: " << mw_errstr << std::endl;
exit(1); exit(1);
} }
...@@ -101,16 +103,33 @@ KafkaConnection::KafkaConnection(const Config& config) ...@@ -101,16 +103,33 @@ KafkaConnection::KafkaConnection(const Config& config)
* Create topic handle. * Create topic handle.
*/ */
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, errstr); tconf, mw_errstr);
if (!topic) { if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl; std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1); exit(1);
} }
std::string line = "GMSEC Pub has Connected";
/* /*
* Publish Message * Publish Message
*/ */
//RdKafka::ErrorCode resp = producer->produce(topic, partition, int32_t partition = RdKafka::Topic::PARTITION_UA;
RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR){
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
}else{
std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
producer->poll(0);
}
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
delete topic;
delete producer;
} }
KafkaConnection::~KafkaConnection() KafkaConnection::~KafkaConnection()
...@@ -155,6 +174,60 @@ void KafkaConnection::mwUnsubscribe(const char *subject) ...@@ -155,6 +174,60 @@ void KafkaConnection::mwUnsubscribe(const char *subject)
void KafkaConnection::mwPublish(const Message& message, const Config& config) void KafkaConnection::mwPublish(const Message& message, const Config& config)
{ {
std::string topic_str;
topic_str = message.getSubject();
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, mw_errstr);
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, mw_errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << mw_errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
std::string line = message.toXML();
/*
* Publish Message
*/
int32_t partition = RdKafka::Topic::PARTITION_UA;
RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR){
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
}else{
std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
producer->poll(0);
}
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
delete topic;
delete producer;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << message.toXML() ; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << message.toXML() ;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment