Commit e3b42433 authored by Christopher Reis's avatar Christopher Reis

Cleaned up and updated libraries

Not pretty, but it compiles...Good starting point.
increment(progress)
parent 831dc816
# Copyright 2007-2016 United States Government as represented by the
# Administrator of The National Aeronautics and Space Administration.
# No copyright is claimed in the United States under Title 17, U.S. Code.
# All Rights Reserved.
# Makefile - middleware wrapper template
# Makefile - kafka middleware wrapper
GMSEC_HOME = ../..
include $(GMSEC_HOME)/config/$(GMSEC_PLATFORM)
include $(GMSEC_HOME)/config/$(GMSEC_PLATFORM)
LIBROOTNAME = gmsec_kafka
......@@ -24,8 +16,7 @@ OBJDIR = ./src
LOCAL_DEFS = -DGMSEC_LIBROOTNAME=$(LIBROOTNAME)
LOCAL_INCS = \
-Iinclude
LOCAL_INCS = -I./include
CXXFLAGS += $(LOCAL_DEFS) $(LOCAL_INCS)
......@@ -39,11 +30,11 @@ SRCDIR = src
OBJECTS = \
$(OBJDIR)/gmsec_kafka.o \
$(OBJDIR)/KafkaConnection.o \
$(OBJDIR)/KafkaMessage.o
#$(OBJDIR)/KafkaMessage.o
# default: clean library
default: library
default: clean library
library: $(BINDIR)/$(TARGET)
......@@ -55,4 +46,4 @@ $(BINDIR)/$(TARGET): $(OBJECTS)
$(DLINK) $(BASE_LDFLAGS) $(OBJECTS) $(LIBDIRS) $(LIBS) -o $@
$(OBJDIR)/%.o: $(SRCDIR)/%.cpp
$(CXX) -c $(CXXFLAGS) $< -o $@
$(CXX) -c $(API_CXXFLAGS) $< -o $@
......@@ -21,95 +21,57 @@
#include <gmsec_kafka.h>
#include <gmsec/internal/BaseConnection.h>
#include <gmsec4_defs.h>
#include <KafkaMessage.h>
#include <gmsec4/internal/ConnectionInterface.h>
#include <gmsec4/internal/UniqueFilter.h>
#include <gmsec4/Config.h>
#include <gmsec4/Message.h>
//#include <KafkaMessage.h>
/** @class VoidConnection
* This class provides a sample (dummy) implementation of the
* BaseConnection abstract class.
*/
class GMSEC_KAFKA_API KafkaConnection
: public gmsec::internal::BaseConnection
class GMSEC_KAFKA_API KafkaConnection : public gmsec::api::internal::ConnectionInterface
{
private:
bool mw_test;
public:
/** @fn VoidConnection(gmsec::Config *cfg)
* Standard constructor that requires a config
*/
KafkaConnection(gmsec::Config *cfg);
virtual ~KafkaConnection();
KafkaConnection(const gmsec::api::Config& config);
/** @fn CloneMessage( gmsec::Message *in, gmsec::Message *&out )
* This function copies a message without knowing what type it is
*/
virtual gmsec::Status CALL_TYPE CloneMessage(gmsec::Message *in, gmsec::Message *&out);
virtual ~KafkaConnection();
/** @fn GetLibraryVersion()
* Retrieve the version of the underlying middleware libraries
*/
virtual const char * CALL_TYPE GetLibraryVersion();
virtual const char* CALL_TYPE getLibraryVersion();
/** @fn GetLibraryRootName()
* Retrieve the root library name
*/
virtual const char * CALL_TYPE GetLibraryRootName()
virtual const char* CALL_TYPE getLibraryRootName()
{
return "gmsec_kafka";
}
/** @fn mwConnect()
* @brief Establish connection to the server.
*/
virtual gmsec::Status CALL_TYPE mwConnect();
/** @fn mwDisconnect()
* @brief End connection to the server.
*/
virtual gmsec::Status CALL_TYPE mwDisconnect();
/** @fn mwSubscribe(const char *subject, const gmsec::Config &config)
* @brief Subscribe to a subject without a callback
*/
virtual gmsec::Status CALL_TYPE mwSubscribe(const char *subject, const gmsec::Config &config);
/** @fn mwUnSubscribe(const char *subject)
* @brief Unsubscribe from a subject
*/
virtual gmsec::Status CALL_TYPE mwUnSubscribe(const char *subject);
/** @fn mwCreateMessage(const char *subject, GMSEC_MSG_KIND msgKind, gmsec::Message *&msg)
* @brief Create a middleware specific message.
*/
virtual gmsec::Status CALL_TYPE mwCreateMessage(const char *subject, GMSEC_MSG_KIND msgKind, gmsec::Message *&msg);
/** @fn DestroyMessage(gmsec::Message *msg)
* destroy a kafka message
*/
virtual gmsec::Status CALL_TYPE DestroyMessage(gmsec::Message *msg);
/** @fn mwPublish(gmsec::Message *msg, const gmsec::Config &config)
* @brief Send the message to the middleware.
*/
virtual gmsec::Status CALL_TYPE mwPublish(gmsec::Message *msg, const gmsec::Config &config);
/** @fn mwRequest(gmsec::Message *request, long timeout, gmsec::Message *&reply)
* @brief Send a request message to the middleware.
*/
virtual gmsec::Status CALL_TYPE mwRequest(gmsec::Message *request, std::string &id);
/** @fn mwReply(gmsec::Message *request,gmsec::Message *reply)
* @brief Send a reply message to the middleware.
*/
virtual gmsec::Status CALL_TYPE mwReply(gmsec::Message *request,gmsec::Message *reply);
/** @fn GetNextMsg(gmsec::Message *&msg, long timeout)
* pull the next message off the inbound queue
*/
virtual gmsec::Status CALL_TYPE mwGetNextMsg(gmsec::Message *&msg, long timeout);
virtual const char* getMWInfo();
virtual void CALL_TYPE mwConnect();
virtual void CALL_TYPE mwDisconnect();
virtual void CALL_TYPE mwSubscribe(const char* subject, const gmsec::api::Config& config);
virtual void CALL_TYPE mwUnsubscribe(const char* subject);
virtual void CALL_TYPE mwPublish(const gmsec::api::Message& msg, const gmsec::api::Config& config);
virtual void CALL_TYPE mwRequest(const gmsec::api::Message& request, std::string& id);
virtual void CALL_TYPE mwReply(const gmsec::api::Message& request, const gmsec::api::Message& reply);
virtual void CALL_TYPE mwReceive(gmsec::api::Message*& msg, GMSEC_I32 timeout);
};
#endif // VoidConnection_h
/*
* Copyright 2007-2016 United States Government as represented by the
* Administrator of The National Aeronautics and Space Administration.
* No copyright is claimed in the United States under Title 17, U.S. Code.
* All Rights Reserved.
*/
/* @file KafkaMessage.h
* This file provides a template for implementing a middleware wrapper.
*/
#ifndef _KafkaMessage_h_
#define _KafkaMessage_h_
#include <gmsec_kafka.h>
#include <gmsec/internal/SimpleMessage.h>
/** @class KafkaMessage
* This class implements the Message abstract base class to support
* Kafka middleware.
*/
class GMSEC_KAFKA_API KafkaMessage
: public gmsec::internal::SimpleMessage
{
public:
/** @fn KafkaMessage(subject, type)
* Standard constructor that takes a kafka message pointer
*/
KafkaMessage();
virtual ~KafkaMessage();
/** @fn getCMessagePtr()
* Access to the unlying kafka native message pointer.
*/
// CMessage * CALL_TYPE getCMessagePtr() { return NULL; }
/** @fn GetLibraryRootName()
* Retrieve the root library name.
*/
virtual const char * CALL_TYPE GetLibraryRootName()
{
return "gmsec_kafka";
}
#ifdef SUBCLASS_BASEMESSAGE
/** @fn ProcessConfigValue(const char *name, const char *value)
* Support function to add configuration value support for
* this particular middleware.
*/
virtual bool CALL_TYPE ProcessConfigValue(const char *name, const char *value);
/** @fn isValid()
* Check to see if this message has a valid native pointer
*/
virtual bool CALL_TYPE isValid();
/** @fn SetKind(GMSEC_MSG_KIND kind)
* Set the message kind (PUBLISH,REQUEST,REPLY)
*/
virtual gmsec::Status CALL_TYPE mwSetKind(GMSEC_MSG_KIND kind);
/** @fn GetKind(GMSEC_MSG_KIND &kind)
* Get the message kind (PUBLISH,REQUEST,REPLY)
*/
virtual gmsec::Status CALL_TYPE GetKind(GMSEC_MSG_KIND &kind);
/** @fn SetSubject(const char *subject)
* Set the message subject
*/
virtual gmsec::Status CALL_TYPE mwSetSubject(const char *subject);
/** @fn GetSubject(const char *&subject)
* Get the message subject
*/
virtual gmsec::Status CALL_TYPE GetSubject(const char *&subject);
/** @fn ClearFields()
* Clear all fields.
*/
virtual gmsec::Status CALL_TYPE ClearFields();
/** @fn AddField(gmsec::Field &field)
* Add a field to this message. This will overwrite an existing field
* of the same name.
*/
virtual gmsec::Status CALL_TYPE AddField(gmsec::Field &field);
/** @fn ClearField(const char *name)
* Clear this named field.
*/
virtual gmsec::Status CALL_TYPE ClearField(const char *name);
/** @fn GetField(const char *name, gmsec::Field &field)
* Get a field by name if available.
*/
virtual gmsec::Status CALL_TYPE GetField(const char *name, gmsec::Field &field);
/** @fn GetFieldCount(long &count)
* Get the number of available fields
*/
virtual gmsec::Status CALL_TYPE GetFieldCount(long &count);
/** @fn GetMSGSize(unsigned long &size)
* Get the physical message size
*/
virtual gmsec::Status CALL_TYPE GetMSGSize(unsigned long &size);
/** @fn GetFirstField(gmsec::Field &field)
* Iteration of fields, this will get the first field and reset the
* internal iteration to the first field.
*/
virtual gmsec::Status CALL_TYPE GetFirstField(gmsec::Field &field);
/** @fn GetNextField(gmsec::Field &field)
* Iteration of fields, this will get the next field and set the
* internal iteration to the next field.
*/
virtual gmsec::Status CALL_TYPE GetNextField(gmsec::Field &field);
#endif /* SUBCLASS_BASEMESSAGE */
};
#endif // KafkaMessage_h
/*
* Copyright 2007-2016 United States Government as represented by the
* Administrator of The National Aeronautics and Space Administration.
* No copyright is claimed in the United States under Title 17, U.S. Code.
* All Rights Reserved.
*/
/* @file gmsec_kafka.h
* This file provides a template for implementing a middleware wrapper.
*/
#ifndef GMSEC_KAFKA_H
#define GMSEC_KAFKA_H
#include <gmsec4/internal/ConnectionInterface.h>
#include <gmsec4/Config.h>
#include <gmsec4/Status.h>
//#include <zmq.h>
/* @file gmsec_kafka.h
* This file provides a template for implementing a middleware wrapper.
*/
#ifndef _gmsec_kafka_h_
#define _gmsec_kafka_h_
// Use an extra level of indirection to expand integer macros before concatenating them
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
#ifdef WIN32
#ifdef GMSEC_KAFKA_EXPORTS
#define GMSEC_KAFKA_API __declspec(dllexport)
#else
#define GMSEC_KAFKA_API __declspec(dllimport)
#include <windows.h>
#ifdef GMSEC_KAFKA_EXPORTS
#define GMSEC_KAFKA_API __declspec(dllexport)
#else
#define GMSEC_KAFKA_API __declspec(dllimport)
#endif
/* disable invalid STL waring for windows targets */
#pragma warning ( disable : 4251 )
#pragma warning ( disable : 4786 )
#else // All other platforms make this macro invisible
#define GMSEC_KAFKA_API
#endif
/* disable invalid STL waring for windows targets */
#pragma warning ( disable : 4251 )
#pragma warning ( disable : 4786 )
#else // All other platforms make this macro invisible
#define GMSEC_KAFKA_API
//Constants
//#define KAFKA_VERSION_STRING "gmsec_kafka" STR(KAFKA_VERSION_MAJOR) STR(KAFKA_VERSION_MINOR) STR(KAFKA_VERSION_PATCH)
#define KAFKA_REPLY_ADDRESS "KAFKA-REPLY-ADDRESS"
// GMSEC_LIBROOTNAME is defined in the building Makefile or mak/dsp file.
#ifdef GMSEC_LIBROOTNAME
// This makes a "" string from the given symbol.
#define makeString(s) (#s)
// This evaluates the symbol before making a string out of it.
#define makeStringFromValue(s) makeString(s)
// That way, I can make a string which has the value of the macro,
// as opposed to a string which has the value of the macro name.
// No more "" in Makefiles and/or mak/dsw files.
#define KAFKA_VERSION_STRING makeStringFromValue(GMSEC_LIBROOTNAME)
#else
// Or, in the case of the original 6.5 release, this.
#define KAFKA_VERSION_STRING "gmsec_kafka"
#endif
#include <gmsec_cpp.h>
#ifdef __cplusplus
extern "C"
{
#endif
/** @fn CreateConnection(Config *cfg, Connection **conn)
* This function wraps the VoidConnection constructor with the standard
* connection creation function prototype.
*
* @sa ConnectionFactory::Create(Config *cfg, Connection *&conn)
*/
GMSEC_KAFKA_API gmsec::Status
*CreateConnection(gmsec::Config *cfg, gmsec::Connection **conn);
/** @fn DestroyConnection(Connection *conn)
* This function wraps the VoidConnection destructor with the standard
* connection deletion function prototype.
*
* @sa ConnectionFactory::Destroy(Connection *conn)
*/
GMSEC_KAFKA_API void
DestroyConnection(gmsec::Connection *conn);
GMSEC_KAFKA_API void createConnection(const gmsec::api::Config* config, gmsec::api::internal::ConnectionInterface** connIf , gmsec::api::Status* status);
#ifdef __cplusplus
} // extern "C"
} // extern "C"
#endif
#endif /* gmsec_void_h */
#endif
......@@ -15,187 +15,87 @@
/* @file KafkaConnection.cpp
* This file provides a template for implementing a middleware wrapper.
*/
#include <gmsec_kafka.h>
#include <KafkaConnection.h>
#include <gmsec/internal/rawbuf.h>
#include <gmsec4/internal/InternalConnection.h>
#include <gmsec4/internal/MessageBuddy.h>
#include <gmsec4/internal/Rawbuf.h>
#include <gmsec4/Connection.h>
#include <gmsec4/Errors.h>
using namespace gmsec;
using namespace gmsec::util;
#include <gmsec4/util/Buffer.h>
#include <gmsec4/util/Condition.h>
#include <gmsec4/util/Log.h>
#include <gmsec4/util/Mutex.h>
using namespace std;
#include <gmsec_version.h>
KafkaConnection::KafkaConnection(Config *cfg)
:
BaseConnection(cfg)
{
Status result;
char raw[4000];
rawbuf buffer(raw, sizeof(raw));
ostream os(&buffer);
os << "KafkaConnection::KafkaConnection:";
if (NULL != cfg)
{
const char * key;
const char * value;
for (result = cfg->GetFirst(key, value);
GMSEC_STATUS_NO_ERROR != result.GetCode();
result = cfg->GetNext(key, value))
{
os << "\n\t(" << key << ',' << value << ')';
}
}
os << ends;
LOG_DEBUG << raw;
}
using namespace gmsec::api;
using namespace gmsec::api::internal;
using namespace gmsec::api::util;
KafkaConnection::~KafkaConnection()
{
LOG_DEBUG << "gmsec_kafka:KafkaConnection::~KafkaConnection()";
}
using namespace std;
const char *KafkaConnection::GetLibraryVersion()
KafkaConnection::KafkaConnection(const Config& config)
:
mw_test(false)
{
return "v0.2.1";
GMSEC_DEBUG << "Connection" << '\n';
}
Status KafkaConnection::mwConnect()
KafkaConnection::~KafkaConnection()
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwConnect()";
return result;
GMSEC_DEBUG << "~Connection" << '\n';
}
Status KafkaConnection::mwDisconnect()
const char* KafkaConnection::getLibraryVersion()
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwDisconnect()";
return result;
return "v0.10.1";
}
Status KafkaConnection::mwSubscribe(const char *subject, const gmsec::Config &config)
void KafkaConnection::mwConnect()
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwConnect()";
}
Status KafkaConnection::mwUnSubscribe(const char *subject)
void KafkaConnection::mwDisconnect()
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwDisconnect()";
}
Status KafkaConnection::mwCreateMessage(const char *subject,
GMSEC_MSG_KIND kind, Message *&msg)
void KafkaConnection::mwSubscribe(const char* subject, const Config& config)
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwCreateMessage("
<< (subject ? subject : "[null]")
<< ", kind=" << kind << ", msg=" << msg << ')';
{
KafkaMessage * tmp = new KafkaMessage();
tmp->SetSubject(subject);
tmp->SetKind(kind);
msg = tmp->createExternal();
}
if (!msg)
{
result.Set(
GMSEC_STATUS_MESSAGE_ERROR,
GMSEC_INVALID_MESSAGE,
"KafkaMessage object not created");
LOG_WARNING << result.Get();
}
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwCreateMessage => " << msg;
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwSubscribe(" << subject << ')';
}
#if 0
Status KafkaConnection::DestroyMessage(Message *msg)
void KafkaConnection::mwUnsubscribe(const char *subject)
{
Status result;
LOG_INFO << "gmsec_kafka:KafkaConnection::DestroyMessage(" << msg << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwUnSubscribe(" << subject << ')';
}
#endif
Status KafkaConnection::mwPublish(Message *msg, const gmsec::Config &config)
void KafkaConnection::mwPublish(const Message& message, const Config& config)
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::Publish(" << msg << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::Publish(things)";
}
Status KafkaConnection::mwRequest(Message *request, std::string &id)
void KafkaConnection::mwRequest(const Message& message, std::string& id)
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest(" << request << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwRequest("<< ')';
}
Status KafkaConnection::mwReply(Message *request, Message *reply)
void KafkaConnection::mwReply(const Message& request, const Message& reply)
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwReply(request=" << request
<< ", reply=" << reply << ')';
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReply(request=" << ", reply=" << ')';
}
Status KafkaConnection::mwGetNextMsg(Message *&msg, GMSEC_I32 timeout)
void KafkaConnection::mwReceive(Message*& message, GMSEC_I32 timeout)
{
Status result;
LOG_DEBUG << "gmsec_kafka:KafkaConnection::mwGetNextMsg(msg=" << msg
<< ", timeout=" << timeout << ')';
msg = NULL;
result.Set(GMSEC_STATUS_CONNECTION_ERROR,
GMSEC_NO_MESSAGE_AVAILABLE,
"Nothing left.");
return result;
GMSEC_DEBUG << "gmsec_kafka:KafkaConnection::mwReceive";
}
// EOF KafkaConnection.cpp
/*
* Copyright 2007-2016 United States Government as represented by the
* Administrator of The National Aeronautics and Space Administration.
* No copyright is claimed in the United States under Title 17, U.S. Code.
* All Rights Reserved.
*/
/* @file KafkaMessage.cpp
* This file provides a template for implementing a middleware wrapper.
*/
#include <KafkaMessage.h>
#include <gmsec/internal/Log.h>
using namespace gmsec;
using namespace gmsec::util;
KafkaMessage::KafkaMessage()
{
LOG_VERBOSE << "KafkaMessage::KafkaMessage()";
}
KafkaMessage::~KafkaMessage()
{
LOG_VERBOSE << "KafkaMessage::~KafkaMessage()";
}
#ifdef SUBCLASS_BASEMESSAGE
bool KafkaMessage::isValid()
{
LOG_VERBOSE << "KafkaMessage::isValid()";
return (true);
}
Status KafkaMessage::SetKind(GMSEC_MSG_KIND kind)
{
Status result;
LOG_VERBOSE << "KafkaMessage::mwSetKind(" << kind << ')';
mType = (size_t) kind;
switch (kind)
{
case GMSEC_MSG_UNSET:
printf("gmsec_kafka Not Specified\n");
break;
case GMSEC_MSG_PUBLISH:
printf("gmsec_kafka Proxy - non-blocking\n");
break;
case GMSEC_MSG_REQUEST:
printf("gmsec_kafka Send()/Receive() - blocking\n");
break;
case GMSEC_MSG_REPLY:
printf("gmsec_kafka Reply() - non/un-blocking\n");
break;
default:
printf("gmsec_kafka Error\n");
result.Set(
GMSEC_STATUS_MESSAGE_ERROR,
GMSEC_UNKNOWN_MSG_TYPE,
"Neither Send, Reply, nor Proxy message type");
break;
}
return result;
}
Status KafkaMessage::GetKind(GMSEC_MSG_KIND &kind)
{
Status result;
LOG_DEBUG << "KafkaMessage::GetKind(" << &kind << ')';
kind = mType;
return result;
}
Status KafkaMessage::mwSetSubject(const char *subject)
{
Status result;
LOG_DEBUG << "KafkaMessage::mwSetSubject("
(subject ? subject : "[null]") << ')';
return result;
}
Status KafkaMessage::GetSubject(const char *&subject)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetSubject(" << &subject << ')';
subject = mName;
return result;
}
bool KafkaMessage::ProcessConfigValue(const char *name, const char *value)
{
LOG_DEBUG << "KafkaConnection::ProcessConfigValue(name="
<< (name ? name : "[null]")
<< ", value=" << (value ? value : "[null]");
return true;
}
Status KafkaMessage::ClearFields()
{
Status result;
LOG_DEBUG << "KafkaConnection::ClearFields()";
return result;
}
Status KafkaMessage::ClearField(const char *name)
{
Status result;
LOG_DEBUG << "KafkaConnection::ClearField(name="
<< (name ? name : "[null]") << ')';
return result;
}
Status KafkaMessage::GetField(const char *name, Field &field)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetField(name="
<< (name ? name : "[null]")
<< ", field=" << &field << ')';
return result;
}
Status KafkaMessage::GetFieldCount(GMSEC_I32 &count)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetFieldCount(@count=" << &count << ')';
return result;
}
Status KafkaMessage::GetFirstField(Field &field)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetFirstField(@field=" << &field << ')';
return result;
}
Status KafkaMessage::GetNextField(Field &field)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetNextField(@field=" << &field << ')';
result.Set(GMSEC_STATUS_MESSAGE_ERROR,
GMSEC_FIELDS_END_REACHED,
"Nominal end-of-fields");
return result;
}
Status KafkaMessage::GetMSGSize(unsigned long &size)
{
Status result;
LOG_DEBUG << "KafkaConnection::GetMSGSize(@size=" << &size << ')';
return result;
}
#endif /* SUBCLASS_BASEMESSAGE */
// EOF KafkaMessage.cpp
/*
* Copyright 2007-2016 United States Government as represented by the
* Copyright 2007-2015 United States Government as represented by the
* Administrator of The National Aeronautics and Space Administration.
* No copyright is claimed in the United States under Title 17, U.S. Code.
* All Rights Reserved.
*/
#include "gmsec_kafka.h"
#include "KafkaConnection.h"
#include <gmsec4/util/Log.h>
using namespace gmsec::api;
using namespace gmsec::api::internal;
//using namespace gmsec_kafka;
/* @file gmsec_kafka.h
* This file provides a template for implementing a middleware wrapper.
*/
#include <gmsec_kafka.h>
#include <gmsec/internal/Log.h>
#include <gmsec/internal/StatusException.h>
#include <KafkaConnection.h>
GMSEC_KAFKA_API void createConnection(const Config* config, ConnectionInterface** connIf, Status* status)
{
using namespace gmsec;
using namespace gmsec::util;
*connIf = NULL;
GMSEC_KAFKA_API Status *CreateConnection(Config *cfg, Connection **conn)
{
static Status result;
if (status == NULL)
{
GMSEC_ERROR << "Status is NULL";
return;
}
result.ReSet();
LOG_DEBUG << "gmsec_kafka:CreateConnection(" << cfg << ", " << conn << ")";
try
{
KafkaConnection * tmp = new KafkaConnection(cfg);
*conn = tmp->createExternal();
if (config)
{
*connIf = new KafkaConnection(*config);
}
else
{
status->set(MIDDLEWARE_ERROR, OTHER_ERROR_CODE, "No Config object supplied");
}
}
catch (gmsec::internal::StatusException &se)
catch (std::exception& e)
{
result = se;
status->set(MIDDLEWARE_ERROR, OTHER_ERROR_CODE, e.what());
}
LOG_DEBUG << "gmsec_kafka: *conn=" << *conn;
return &result;
}
GMSEC_KAFKA_API void DestroyConnection(Connection *conn)
{
LOG_DEBUG << "gmsec_kafka:DestroyConnection(" << conn << ')';
if (conn)
gmsec::internal::ConnectionBuddy::destroy(conn);
}
// EOF gmsec_kafka.cpp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment