Commit 59133c0a authored by Christopher Reis's avatar Christopher Reis

Implemented queue

parent 9e4805af
......@@ -29,6 +29,8 @@
#include <gmsec4/Config.h>
#include <gmsec4/Message.h>
#include <librdkafka/rdkafkacpp.h>
//#include <KafkaMessage.h>
......@@ -42,7 +44,7 @@ class GMSEC_KAFKA_API KafkaConnection : public gmsec::api::internal::ConnectionI
private:
bool mw_test;
std::string mwInfo;
std::string mw_brokers = "localhost";
std::string mw_brokers;
std::string mw_errstr;
std::string mw_debug;
......
......@@ -51,6 +51,9 @@ using namespace std;
static bool run = true;
static bool exit_eof = false;
int use_ccb = 0;
RdKafka::Producer *producer;
RdKafka::Consumer *consumer;
RdKafka::Queue *rkqu;
static void sigterm (int sig) {
run = false;
......@@ -96,6 +99,7 @@ void msg_consume(RdKafka::Message* message, void* opaque) {
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
GMSEC_INFO << "TEST" << '\n';
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << '\n';
if (message.key())
......@@ -120,22 +124,22 @@ KafkaConnection::KafkaConnection(const Config& config)
{
GMSEC_DEBUG << "Connection test" << '\n';
std::string topic_str;
topic_str = "GMSEC.CONN.PUBLISH";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, mw_errstr);
GMSEC_INFO << '\n' << config.toXML() << '\n';
if(config.getValue("server") != NULL){
mw_brokers = config.getValue("server");
}
GMSEC_DEBUG << "Brokers: " << mw_brokers.c_str() << '\n';
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, mw_errstr);
RdKafka::Conf *pub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
pub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
ExampleDeliveryReportCb ex_dr_cb;
//TODO Fix callback issues
//conf->set("dr_cb", &ex_dr_cb, mw_errstr);
producer = RdKafka::Producer::create(pub_conf, mw_errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << mw_errstr << std::endl;
exit(1);
......@@ -143,37 +147,18 @@ KafkaConnection::KafkaConnection(const Config& config)
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
std::string line = "GMSEC Pub has Connected";
/*
* Publish Message
*/
int32_t partition = RdKafka::Topic::PARTITION_UA;
RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR){
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
}else{
std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
producer->poll(0);
consumer = RdKafka::Consumer::create(sub_conf, mw_errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << mw_errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
rkqu = RdKafka::Queue::create(consumer);
delete topic;
delete producer;
}
KafkaConnection::~KafkaConnection()
......@@ -208,30 +193,21 @@ void KafkaConnection::mwDisconnect()
void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
{
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')';
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')' << '\n';
std::string topic_str = subject;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
RdKafka::Conf *sub_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
sub_conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, mw_errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, mw_errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << mw_errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
RdKafka::Consumer *t_consumer = RdKafka::Consumer::create(sub_conf, mw_errstr);
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, mw_errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(t_consumer, topic_str, tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
......@@ -242,39 +218,40 @@ void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
RdKafka::ErrorCode resp = t_consumer->start(topic, partition, start_offset, rkqu);
GMSEC_DEBUG << "TEST" << '\n';
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
/*
* Consume messages
*/
while (run) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000,
&ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
// ExampleConsumeCb ex_consume_cb;
//
// /*
// * Consume messages
// */
// while (run) {
// if (use_ccb) {
// consumer->consume_callback(topic, partition, 1000,
// &ex_consume_cb, &use_ccb);
// } else {
// RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
// msg_consume(msg, NULL);
// delete msg;
// }
// consumer->poll(0);
// }
/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
// consumer->stop(topic, partition);
//
// consumer->poll(1000);
//
// delete topic;
// delete consumer;
}
void KafkaConnection::mwUnsubscribe(const char *subject)
......@@ -285,29 +262,10 @@ void KafkaConnection::mwUnsubscribe(const char *subject)
void KafkaConnection::mwPublish(const Message& message, const Config& config)
{
Message test_message(message);
std::string topic_str;
topic_str = message.getSubject();
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string topic_str = message.getSubject();
std::string line = message.toXML();
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, mw_errstr);
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, mw_errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << mw_errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
......@@ -317,10 +275,6 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config)
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
test_message.addField("PRODUCER-NAME", producer->name().c_str());
test_message.addField("PRODUCER-NAME-BIN", (GMSEC_BIN) producer->name().c_str(),7);
std::string line = test_message.toXML();
/*
* Publish Message
*/
......@@ -336,12 +290,10 @@ void KafkaConnection::mwPublish(const Message& message, const Config& config)
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
producer->poll(250);
}
delete topic;
delete producer;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << message.toXML() ;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)" << '\n' << message.toXML() ;
}
void KafkaConnection::mwRequest(const Message& message, std::string& id)
......@@ -357,6 +309,11 @@ void KafkaConnection::mwReply(const Message& request, const Message& reply)
void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
{
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive";
RdKafka::Message *msg = consumer->consume(rkqu, (int)timeout);
msg_consume(msg, NULL);
delete msg;
}
// EOF KafkaConnection.cpp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment