diff --git a/binding.gyp b/binding.gyp index cb3b6e6..c989bdb 100644 --- a/binding.gyp +++ b/binding.gyp @@ -38,6 +38,7 @@ 'target_name': 'node_opendds', 'sources': [ 'src/node-opendds.cpp', 'src/NodeDRListener.cpp', + 'src/NodePBITListener.cpp', 'src/NodeQosConversion.cpp' ], 'include_dirs': [ " +#include +#include + +#include +#include + +namespace NodeOpenDDS { +using namespace v8; + +Local copytoV8(const DDS::Time_t& src) +{ + Local stru = Nan::New(); + stru->Set(Nan::New("sec").ToLocalChecked(), Nan::New(src.sec)); + stru->Set(Nan::New("nanosec").ToLocalChecked(), + Nan::New(src.nanosec)); + return stru; +} + +Local copytoV8(const DDS::SampleInfo& src) +{ + Local stru = Nan::New(); +#define INT(X) stru->Set(Nan::New(#X).ToLocalChecked(), Nan::New(src.X)) + INT(sample_state); + INT(view_state); + INT(instance_state); + stru->Set(Nan::New("source_timestamp").ToLocalChecked(), + copytoV8(src.source_timestamp)); + INT(instance_handle); + INT(publication_handle); + INT(disposed_generation_count); + INT(no_writers_generation_count); + INT(sample_rank); + INT(generation_rank); + INT(absolute_generation_rank); +#undef INT + stru->Set(Nan::New("valid_data").ToLocalChecked(), + Nan::New(src.valid_data)); + return stru; +} + +Local toV8(const DDS::ParticipantBuiltinTopicData& src) +{ + ACE_UNUSED_ARG(src); + Local stru = Nan::New(); + + std::string str; + for (CORBA::ULong i = 0; i < src.user_data.value.length(); ++i) { + str += src.user_data.value[i]; + } + stru->Set(Nan::New("user_data").ToLocalChecked(), Nan::New(str).ToLocalChecked()); + + const v8::Local tgt(Nan::New(3)); + for (CORBA::Long i = 0; i < 3; ++i) { + tgt->Set(Nan::New(i), Nan::New(src.key.value[i])); + } + stru->Set(Nan::New("key").ToLocalChecked(), tgt); + + return stru; +} + +NodePBITListener::NodePBITListener(const Local& callback, + const DDS::ParticipantBuiltinTopicDataSeq part_data, + const DDS::SampleInfoSeq infos, + const DDS::DataReader_var& dr) + : callback_(callback) + , part_data_(part_data) + , infos_(infos) + , dr_(dr) + , async_uv_pbit_(this) +{ + uv_async_init(uv_default_loop(), &async_uv_pbit_, async_cb); +} + +NodePBITListener::~NodePBITListener() +{ +} + +void NodePBITListener::async_cb(uv_async_t* async_uv) +{ + static_cast(async_uv)->outer_->async(); +} + +void NodePBITListener::close_cb(uv_handle_t* handle_uv) +{ + static_cast((uv_async_t*)handle_uv)->outer_->_remove_ref(); +} + +void NodePBITListener::shutdown() +{ + _add_ref(); + uv_close((uv_handle_t*)&async_uv_pbit_, close_cb); +} + +void NodePBITListener::on_data_available(DDS::DataReader* dr) +{ + + DDS::ParticipantBuiltinTopicDataDataReader_var part_dr = + DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr); + + part_dr->take(part_data_, infos_, 1, DDS::NOT_READ_SAMPLE_STATE, DDS::ANY_VIEW_STATE, + DDS::ANY_INSTANCE_STATE); + + uv_async_send(&async_uv_pbit_); +} + +void NodePBITListener::async() // called from libuv event loop +{ + Nan::HandleScope scope; + + try { + const v8::Local stru = Nan::New(); + + Local argv[] = { copytoV8(infos_[0]), toV8(part_data_[0]) }; + + Local callback = Nan::New(callback_); + Nan::Callback cb(callback); + cb.Call(sizeof(argv) / sizeof(argv[0]), argv); + } catch (...) { + } +} + +void NodePBITListener::reserve(CORBA::ULong) +{ +} + +void NodePBITListener::push_back(const DDS::SampleInfo& src, const void* sample) +{ +} + +} diff --git a/src/NodePBITListener.h b/src/NodePBITListener.h new file mode 100644 index 0000000..9db9385 --- /dev/null +++ b/src/NodePBITListener.h @@ -0,0 +1,69 @@ +#ifndef OPENDDS_NODEPBITLISTENER_H +#define OPENDDS_NODEPBITLISTENER_H + +#include + +#include + +#include +#include +#include + +#include +#include +#include + +namespace NodeOpenDDS { + + class NodePBITListener + : public virtual OpenDDS::DCPS::LocalObject + , private OpenDDS::DCPS::AbstractSamples { + public: + NodePBITListener(const v8::Local& callback, + const DDS::ParticipantBuiltinTopicDataSeq part_data, + const DDS::SampleInfoSeq infos, const DDS::DataReader_var& dr); + ~NodePBITListener(); + void shutdown(); + + private: + static void async_cb(uv_async_t* async_uv); + static void close_cb(uv_handle_t* handle_uv); + + typedef DDS::RequestedDeadlineMissedStatus RDMStatus; + void on_requested_deadline_missed(DDS::DataReader*, const RDMStatus&) {} + typedef DDS::RequestedIncompatibleQosStatus RIQStatus; + void on_requested_incompatible_qos(DDS::DataReader*, const RIQStatus&) {} + void on_sample_rejected(DDS::DataReader*, + const DDS::SampleRejectedStatus&) {} + void on_liveliness_changed(DDS::DataReader*, + const DDS::LivelinessChangedStatus&) {} + void on_subscription_matched(DDS::DataReader*, + const DDS::SubscriptionMatchedStatus&) {} + void on_sample_lost(DDS::DataReader*, const DDS::SampleLostStatus&) {} + + void on_data_available(DDS::DataReader*); + + void async(); // called from libuv event loop + + Nan::Persistent callback_; + const DDS::DataReader_var& dr_; + + DDS::ParticipantBuiltinTopicDataSeq part_data_; + DDS::SampleInfoSeq infos_; + + struct AsyncUvN : uv_async_t { + explicit AsyncUvN(NodePBITListener* outer) : outer_(outer) {} + NodePBITListener* outer_; + } async_uv_pbit_; + + NodePBITListener(const NodePBITListener&); + NodePBITListener& operator=(const NodePBITListener&); + + void reserve(CORBA::ULong); + void push_back(const DDS::SampleInfo& src, const void* sample); + + }; + +} + +#endif diff --git a/src/node-opendds.cpp b/src/node-opendds.cpp index bc55f77..affbf85 100644 --- a/src/node-opendds.cpp +++ b/src/node-opendds.cpp @@ -1,4 +1,5 @@ #include "NodeDRListener.h" +#include "NodePBITListener.h" #include "NodeQosConversion.h" #include @@ -18,6 +19,7 @@ using namespace v8; using OpenDDS::DCPS::Data_Types_Register; using NodeOpenDDS::NodeDRListener; +using NodeOpenDDS::NodePBITListener; using NodeOpenDDS::convertQos; namespace { @@ -43,6 +45,8 @@ namespace { void delete_participant(const Nan::FunctionCallbackInfo& fci); void subscribe(const Nan::FunctionCallbackInfo& fci); void unsubscribe(const Nan::FunctionCallbackInfo& fci); + void subscribe_participant_topic(const Nan::FunctionCallbackInfo& fci); + void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo& fci); void initialize(const Nan::FunctionCallbackInfo& fci) { @@ -96,6 +100,8 @@ namespace { ot->SetInternalFieldCount(1); Nan::SetMethod(ot, "subscribe", subscribe); Nan::SetMethod(ot, "unsubscribe", unsubscribe); + Nan::SetMethod(ot, "subscribe_participant_topic", subscribe_participant_topic); + Nan::SetMethod(ot, "unsubscribe_participant_topic", unsubscribe_participant_topic); const Local obj = ot->NewInstance(); Nan::SetInternalFieldPointer(obj, 0, dp._retn()); fci.GetReturnValue().Set(obj); @@ -248,6 +254,37 @@ namespace { fci.GetReturnValue().Set(obj); } + void subscribe_participant_topic(const Nan::FunctionCallbackInfo& fci) { + if (fci.Length() < 1) { + Nan::ThrowTypeError("At least 1 argument required"); + fci.GetReturnValue().SetUndefined(); + return; + } + if (!fci[fci.Length() - 1]->IsFunction()) { + Nan::ThrowTypeError("Last argument must be a function"); + fci.GetReturnValue().SetUndefined(); + return; + } + void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0); + DDS::DomainParticipant* const dp = + static_cast(internal); + + DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ; + DDS::DataReader_var dr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + + DDS::ParticipantBuiltinTopicDataSeq part_data; + DDS::SampleInfoSeq infos; + + Local cb = fci[fci.Length() - 1]; + NodePBITListener* const npbitl = new NodePBITListener(cb.As(), part_data, infos, dr); + const DDS::DataReaderListener_var listen(npbitl); + + dr->set_listener(listen.in(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); + + fci.GetReturnValue().SetUndefined(); + } + void unsubscribe(const Nan::FunctionCallbackInfo& fci) { if (fci.Length() < 1 || !fci[0]->IsObject()) { @@ -287,6 +324,27 @@ namespace { fci.GetReturnValue().SetUndefined(); } + void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo& fci) + { + void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0); + DDS::DomainParticipant* const dp = + static_cast(internal); + + DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ; + DDS::DataReader_var dr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + + const DDS::DataReaderListener_var drl = dr->get_listener(); + NodePBITListener* const ndrl = dynamic_cast(drl.in()); + ndrl->shutdown(); + + dr = 0; + bit_subscriber->delete_contained_entities(); + dp->delete_subscriber(bit_subscriber); + + fci.GetReturnValue().SetUndefined(); + } + void finalize(const Nan::FunctionCallbackInfo& fci) { if (fci.Length() < 1) { diff --git a/test/test.js b/test/test.js index e7b567f..e4b74b8 100644 --- a/test/test.js +++ b/test/test.js @@ -68,11 +68,17 @@ try { log('Sample Info', sinfo); if (sinfo.valid_data && sample.id === last_sample_id) { participant.unsubscribe(reader); + participant.unsubscribe_participant_topic(); } } catch (e) { console.log("Error in callback: " + e); } }); + + participant.subscribe_participant_topic(function(info, participant) { + log('Received Participant', participant); + log('Received info', info); + }); } catch (e) { console.log(e); }