Commit d7f44864 authored by Christopher Reis's avatar Christopher Reis

Connection working

Currently connects to kafka server
parent 35a37996
...@@ -20,14 +20,11 @@ LOCAL_DEFS = -DGMSEC_LIBROOTNAME=$(LIBROOTNAME) ...@@ -20,14 +20,11 @@ LOCAL_DEFS = -DGMSEC_LIBROOTNAME=$(LIBROOTNAME)
LOCAL_INCS = -I$(GMSEC_HOME)/include \ LOCAL_INCS = -I$(GMSEC_HOME)/include \
-I$(SUPDIR)/include \ -I$(SUPDIR)/include \
-I./include -I./include \
CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS) CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS)
LIBDIRS = LDFLAGS += -L$(SUPDIR)/include -lrdkafka++ -lrdkafka -lpthread -lrt -lz
LIBS = -lrdkafka -lz -lpthread -lrt
# #
SRCDIR = src SRCDIR = src
...@@ -47,7 +44,7 @@ clean: ...@@ -47,7 +44,7 @@ clean:
$(RM) $(BINDIR)/$(TARGET) $(RM) $(BINDIR)/$(TARGET)
$(BINDIR)/$(TARGET): $(OBJECTS) $(BINDIR)/$(TARGET): $(OBJECTS)
$(DLINK) $(BASE_LDFLAGS) $(OBJECTS) $(LIBDIRS) $(LIBS) -o $@ $(DLINK) $(OBJECTS) $(LDFLAGS) -o $@
$(OBJDIR)/%.o: $(SRCDIR)/%.cpp $(OBJDIR)/%.o: $(SRCDIR)/%.cpp
$(CXX) -c $(API_CXXFLAGS) $< -o $@ $(CXX) -c $(API_CXXFLAGS) $(LDFLAGS) $< -o $@
...@@ -31,9 +31,15 @@ ...@@ -31,9 +31,15 @@
#include <gmsec4/util/Log.h> #include <gmsec4/util/Log.h>
#include <gmsec4/util/Mutex.h> #include <gmsec4/util/Mutex.h>
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <gmsec_version.h> #include <gmsec_version.h>
#include <rdkafka.h>
using namespace gmsec::api; using namespace gmsec::api;
using namespace gmsec::api::internal; using namespace gmsec::api::internal;
...@@ -41,30 +47,69 @@ using namespace gmsec::api::util; ...@@ -41,30 +47,69 @@ using namespace gmsec::api::util;
using namespace std; using namespace std;
/** static bool run = true;
* Message delivery report callback. static bool exit_eof = false;
* Called once for each message.
* See rdkafka.h for more information. static void sigterm (int sig) {
*/ run = false;
// static void msg_delivered (rd_kafka_t *rk, }
// void *payload, size_t len,
// int error_code, class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
// void *opaque, void *msg_opaque) { public:
// void dr_cb (RdKafka::Message &message) {
// if (error_code) std::cout << "Message delivery for (" << message.len() << " bytes): " <<
// fprintf(stderr, "%% Message delivery failed: %s\n", message.errstr() << '\n';
// rd_kafka_err2str(error_code)); if (message.key())
// else if (!quiet) std::cout << "Key: " << *(message.key()) << ";" << '\n';
// fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len, }
// (int)len, (const char *)payload); };
// }
KafkaConnection::KafkaConnection(const Config& config) KafkaConnection::KafkaConnection(const Config& config)
: :
mw_test(false), mw_test(false),
mwInfo("") mwInfo("")
{ {
GMSEC_DEBUG << "Connection" << '\n'; GMSEC_DEBUG << "Connection test" << '\n';
std::string brokers = "localhost";
std::string errstr;
std::string topic_str;
std::string mode;
std::string debug;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("metadata.broker.list", brokers, errstr);
conf->set("dr_cb", &ex_dr_cb, errstr);
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Publish Message
*/
//RdKafka::ErrorCode resp = producer->produce(topic, partition,
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment