Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
'target_name': 'node_opendds',
'sources': [ 'src/node-opendds.cpp',
'src/NodeDRListener.cpp',
'src/NodePBITListener.cpp',
'src/NodeQosConversion.cpp' ],
'include_dirs': [ "<!(node -e \"require('nan')\")",
'$(ACE_ROOT)', '$(TAO_ROOT)', '$(DDS_ROOT)' ],
Expand Down
133 changes: 133 additions & 0 deletions src/NodePBITListener.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#include "NodePBITListener.h"

#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Registered_Data_Types.h>
#include <dds/DCPS/BuiltInTopicUtils.h>

#include <nan.h>
#include <stdexcept>

namespace NodeOpenDDS {
using namespace v8;

Local<Object> copytoV8(const DDS::Time_t& src)
{
Local<Object> stru = Nan::New<Object>();
stru->Set(Nan::New<String>("sec").ToLocalChecked(), Nan::New(src.sec));
stru->Set(Nan::New<String>("nanosec").ToLocalChecked(),
Nan::New(src.nanosec));
return stru;
}

Local<Object> copytoV8(const DDS::SampleInfo& src)
{
Local<Object> stru = Nan::New<Object>();
#define INT(X) stru->Set(Nan::New<String>(#X).ToLocalChecked(), Nan::New(src.X))
INT(sample_state);
INT(view_state);
INT(instance_state);
stru->Set(Nan::New<String>("source_timestamp").ToLocalChecked(),
copytoV8(src.source_timestamp));
INT(instance_handle);
INT(publication_handle);
INT(disposed_generation_count);
INT(no_writers_generation_count);
INT(sample_rank);
INT(generation_rank);
INT(absolute_generation_rank);
#undef INT
stru->Set(Nan::New<String>("valid_data").ToLocalChecked(),
Nan::New(src.valid_data));
return stru;
}

Local<Object> toV8(const DDS::ParticipantBuiltinTopicData& src)
{
ACE_UNUSED_ARG(src);
Local<Object> stru = Nan::New<Object>();

std::string str;
for (CORBA::ULong i = 0; i < src.user_data.value.length(); ++i) {
str += src.user_data.value[i];
}
stru->Set(Nan::New<v8::String>("user_data").ToLocalChecked(), Nan::New(str).ToLocalChecked());

const v8::Local<v8::Array> tgt(Nan::New<v8::Array>(3));
for (CORBA::Long i = 0; i < 3; ++i) {
tgt->Set(Nan::New(i), Nan::New(src.key.value[i]));
}
stru->Set(Nan::New<v8::String>("key").ToLocalChecked(), tgt);

return stru;
}

NodePBITListener::NodePBITListener(const Local<Function>& callback,
const DDS::ParticipantBuiltinTopicDataSeq part_data,
const DDS::SampleInfoSeq infos,
const DDS::DataReader_var& dr)
: callback_(callback)
, part_data_(part_data)
, infos_(infos)
, dr_(dr)
, async_uv_pbit_(this)
{
uv_async_init(uv_default_loop(), &async_uv_pbit_, async_cb);
}

NodePBITListener::~NodePBITListener()
{
}

void NodePBITListener::async_cb(uv_async_t* async_uv)
{
static_cast<AsyncUvN*>(async_uv)->outer_->async();
}

void NodePBITListener::close_cb(uv_handle_t* handle_uv)
{
static_cast<AsyncUvN*>((uv_async_t*)handle_uv)->outer_->_remove_ref();
}

void NodePBITListener::shutdown()
{
_add_ref();
uv_close((uv_handle_t*)&async_uv_pbit_, close_cb);
}

void NodePBITListener::on_data_available(DDS::DataReader* dr)
{

DDS::ParticipantBuiltinTopicDataDataReader_var part_dr =
DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);

part_dr->take(part_data_, infos_, 1, DDS::NOT_READ_SAMPLE_STATE, DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);

uv_async_send(&async_uv_pbit_);
}

void NodePBITListener::async() // called from libuv event loop
{
Nan::HandleScope scope;

try {
const v8::Local<v8::Object> stru = Nan::New<v8::Object>();

Local<Value> argv[] = { copytoV8(infos_[0]), toV8(part_data_[0]) };

Local<Function> callback = Nan::New(callback_);
Nan::Callback cb(callback);
cb.Call(sizeof(argv) / sizeof(argv[0]), argv);
} catch (...) {
}
}

void NodePBITListener::reserve(CORBA::ULong)
{
}

void NodePBITListener::push_back(const DDS::SampleInfo& src, const void* sample)
{
}

}
69 changes: 69 additions & 0 deletions src/NodePBITListener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#ifndef OPENDDS_NODEPBITLISTENER_H
#define OPENDDS_NODEPBITLISTENER_H

#include <nan.h>

#include <dds/DdsDcpsSubscriptionC.h>

#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Registered_Data_Types.h>
#include <dds/DCPS/BuiltInTopicUtils.h>

#include <dds/DCPS/V8TypeConverter.h>
#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/DataReaderImpl.h>

namespace NodeOpenDDS {

class NodePBITListener
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
, private OpenDDS::DCPS::AbstractSamples {
public:
NodePBITListener(const v8::Local<v8::Function>& callback,
const DDS::ParticipantBuiltinTopicDataSeq part_data,
const DDS::SampleInfoSeq infos, const DDS::DataReader_var& dr);
~NodePBITListener();
void shutdown();

private:
static void async_cb(uv_async_t* async_uv);
static void close_cb(uv_handle_t* handle_uv);

typedef DDS::RequestedDeadlineMissedStatus RDMStatus;
void on_requested_deadline_missed(DDS::DataReader*, const RDMStatus&) {}
typedef DDS::RequestedIncompatibleQosStatus RIQStatus;
void on_requested_incompatible_qos(DDS::DataReader*, const RIQStatus&) {}
void on_sample_rejected(DDS::DataReader*,
const DDS::SampleRejectedStatus&) {}
void on_liveliness_changed(DDS::DataReader*,
const DDS::LivelinessChangedStatus&) {}
void on_subscription_matched(DDS::DataReader*,
const DDS::SubscriptionMatchedStatus&) {}
void on_sample_lost(DDS::DataReader*, const DDS::SampleLostStatus&) {}

void on_data_available(DDS::DataReader*);

void async(); // called from libuv event loop

Nan::Persistent<v8::Function> callback_;
const DDS::DataReader_var& dr_;

DDS::ParticipantBuiltinTopicDataSeq part_data_;
DDS::SampleInfoSeq infos_;

struct AsyncUvN : uv_async_t {
explicit AsyncUvN(NodePBITListener* outer) : outer_(outer) {}
NodePBITListener* outer_;
} async_uv_pbit_;

NodePBITListener(const NodePBITListener&);
NodePBITListener& operator=(const NodePBITListener&);

void reserve(CORBA::ULong);
void push_back(const DDS::SampleInfo& src, const void* sample);

};

}

#endif
58 changes: 58 additions & 0 deletions src/node-opendds.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "NodeDRListener.h"
#include "NodePBITListener.h"
#include "NodeQosConversion.h"

#include <nan.h>
Expand All @@ -18,6 +19,7 @@
using namespace v8;
using OpenDDS::DCPS::Data_Types_Register;
using NodeOpenDDS::NodeDRListener;
using NodeOpenDDS::NodePBITListener;
using NodeOpenDDS::convertQos;

namespace {
Expand All @@ -43,6 +45,8 @@ namespace {
void delete_participant(const Nan::FunctionCallbackInfo<Value>& fci);
void subscribe(const Nan::FunctionCallbackInfo<Value>& fci);
void unsubscribe(const Nan::FunctionCallbackInfo<Value>& fci);
void subscribe_participant_topic(const Nan::FunctionCallbackInfo<Value>& fci);
void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo<Value>& fci);

void initialize(const Nan::FunctionCallbackInfo<Value>& fci)
{
Expand Down Expand Up @@ -96,6 +100,8 @@ namespace {
ot->SetInternalFieldCount(1);
Nan::SetMethod(ot, "subscribe", subscribe);
Nan::SetMethod(ot, "unsubscribe", unsubscribe);
Nan::SetMethod(ot, "subscribe_participant_topic", subscribe_participant_topic);
Nan::SetMethod(ot, "unsubscribe_participant_topic", unsubscribe_participant_topic);
const Local<Object> obj = ot->NewInstance();
Nan::SetInternalFieldPointer(obj, 0, dp._retn());
fci.GetReturnValue().Set(obj);
Expand Down Expand Up @@ -248,6 +254,37 @@ namespace {
fci.GetReturnValue().Set(obj);
}

void subscribe_participant_topic(const Nan::FunctionCallbackInfo<Value>& fci) {
if (fci.Length() < 1) {
Nan::ThrowTypeError("At least 1 argument required");
fci.GetReturnValue().SetUndefined();
return;
}
if (!fci[fci.Length() - 1]->IsFunction()) {
Nan::ThrowTypeError("Last argument must be a function");
fci.GetReturnValue().SetUndefined();
return;
}
void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0);
DDS::DomainParticipant* const dp =
static_cast<DDS::DomainParticipant*>(internal);

DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ;
DDS::DataReader_var dr =
bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC);

DDS::ParticipantBuiltinTopicDataSeq part_data;
DDS::SampleInfoSeq infos;

Local<Value> cb = fci[fci.Length() - 1];
NodePBITListener* const npbitl = new NodePBITListener(cb.As<Function>(), part_data, infos, dr);
const DDS::DataReaderListener_var listen(npbitl);

dr->set_listener(listen.in(), OpenDDS::DCPS::DEFAULT_STATUS_MASK);

fci.GetReturnValue().SetUndefined();
}

void unsubscribe(const Nan::FunctionCallbackInfo<Value>& fci)
{
if (fci.Length() < 1 || !fci[0]->IsObject()) {
Expand Down Expand Up @@ -287,6 +324,27 @@ namespace {
fci.GetReturnValue().SetUndefined();
}

void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo<Value>& fci)
{
void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0);
DDS::DomainParticipant* const dp =
static_cast<DDS::DomainParticipant*>(internal);

DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ;
DDS::DataReader_var dr =
bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC);

const DDS::DataReaderListener_var drl = dr->get_listener();
NodePBITListener* const ndrl = dynamic_cast<NodePBITListener*>(drl.in());
ndrl->shutdown();

dr = 0;
bit_subscriber->delete_contained_entities();
dp->delete_subscriber(bit_subscriber);

fci.GetReturnValue().SetUndefined();
}

void finalize(const Nan::FunctionCallbackInfo<Value>& fci)
{
if (fci.Length() < 1) {
Expand Down
6 changes: 6 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,17 @@ try {
log('Sample Info', sinfo);
if (sinfo.valid_data && sample.id === last_sample_id) {
participant.unsubscribe(reader);
participant.unsubscribe_participant_topic();
}
} catch (e) {
console.log("Error in callback: " + e);
}
});

participant.subscribe_participant_topic(function(info, participant) {
log('Received Participant', participant);
log('Received info', info);
});
} catch (e) {
console.log(e);
}
Expand Down