Commit 40883ff0 authored by Christopher Reis's avatar Christopher Reis

Added Subscription

parent 0ca67dd6
...@@ -51,11 +51,48 @@ using namespace std; ...@@ -51,11 +51,48 @@ using namespace std;
static bool run = true; static bool run = true;
static bool exit_eof = false; static bool exit_eof = false;
int use_ccb = 0;
static void sigterm (int sig) { static void sigterm (int sig) {
run = false; run = false;
} }
void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof) {
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public: public:
void dr_cb (RdKafka::Message &message) { void dr_cb (RdKafka::Message &message) {
...@@ -66,6 +103,13 @@ class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { ...@@ -66,6 +103,13 @@ class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
} }
}; };
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
KafkaConnection::KafkaConnection(const Config& config) KafkaConnection::KafkaConnection(const Config& config)
: :
mw_test(false), mw_test(false),
...@@ -165,6 +209,72 @@ void KafkaConnection::mwDisconnect() ...@@ -165,6 +209,72 @@ void KafkaConnection::mwDisconnect()
void KafkaConnection::mwSubscribe(const char* subject, const Config& config) void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
{ {
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')'; GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')';
std::string topic_str = subject;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", mw_brokers, mw_errstr);
conf->set("dr_cb", &ex_dr_cb, mw_errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, mw_errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << mw_errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, mw_errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << mw_errstr << std::endl;
exit(1);
}
/*
* Start consumer for topic+partition at start offset
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
/*
* Consume messages
*/
while (run) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000,
&ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
} }
void KafkaConnection::mwUnsubscribe(const char *subject) void KafkaConnection::mwUnsubscribe(const char *subject)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment