Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@ endif

WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion
INCPATH = -I./src -I$(THIRD_PATH)/include
<<<<<<< HEAD
CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH)
LDFLAGS += $(THIRD_LIB) -lpthread
OS := $(shell uname -s)
ifeq ($(OS),Linux)
LFLAGS += -lrt
endif
=======
CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) $(EXTRA_CFLAGS)
ifeq ($(USE_S3), 1)
CFLAGS += -DUSE_S3=1
endif
LDFLAGS = $(EXTRA_LDFLAGS) $(THIRD_LIB) -lpthread # -lrt
>>>>>>> upstream/master

PS_LIB = build/libps.a
PS_MAIN = build/libpsmain.a
Expand Down
4 changes: 2 additions & 2 deletions src/system/dashboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace PS {

bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) {
bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) const {
string a_primary, a_secondary;
splitNodeID(a, a_primary, a_secondary);
string b_primary, b_secondary;
Expand All @@ -17,7 +17,7 @@ bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) {
}
}

void NodeIDCmp::splitNodeID(const NodeID& in, string& primary, string& secondary) {
void NodeIDCmp::splitNodeID(const NodeID& in, string& primary, string& secondary) const {
size_t tailing_alpha_idx = in.find_last_not_of("0123456789");
if (std::string::npos == tailing_alpha_idx) {
primary = in;
Expand Down
4 changes: 2 additions & 2 deletions src/system/dashboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace PS {

struct NodeIDCmp {
void splitNodeID(const NodeID& in, string& primary, string& secondary);
bool operator()(const NodeID& a, const NodeID& b);
void splitNodeID(const NodeID& in, string& primary, string& secondary) const;
bool operator()(const NodeID& a, const NodeID& b) const;
};

class Dashboard {
Expand Down
54 changes: 54 additions & 0 deletions src/system/postoffice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,59 @@ void Postoffice::Recv() {
}
}

<<<<<<< HEAD
void Postoffice::manageNode(Task& tk) {
CHECK(tk.has_mng_node());
auto& mng = tk.mng_node();
switch (mng.cmd()) {
case ManageNode::CONNECT: {
CHECK(IamScheduler());
CHECK_EQ(mng.node_size(), 1);
// first add this node into app
Task add = tk;
add.set_customer(CHECK_NOTNULL(app_)->name());
add.mutable_mng_node()->set_cmd(ManageNode::ADD);
manageNode(add);
// create the app in this node
Task task;
task.set_request(true);
task.set_customer(app_->name());
task.set_type(Task::MANAGE);
task.set_time(1);
task.mutable_mng_app()->set_cmd(ManageApp::ADD);
task.mutable_mng_app()->set_conf(app_conf_);
app_->port(mng.node(0).id())->submit(task);
// check if all nodes are connected
if (yp().num_workers() >= FLAGS_num_workers &&
yp().num_servers() >= FLAGS_num_servers) {
nodes_are_ready_.set_value();
}
tk.set_customer(app_->name()); // otherwise the remote node doesn't know
// how to find the according customer
break;
}
case ManageNode::ADD:
case ManageNode::UPDATE: {
auto obj = yp().customer(tk.customer());
CHECK(obj) << "customer [" << tk.customer() << "] doesn't exists";
for (int i = 0; i < mng.node_size(); ++i) {
auto node = mng.node(i);
yp().addNode(node);
obj->exec().add(node);
for (auto c : yp().children(obj->name())) {
auto child = yp().customer(c);
if (child) child->exec().add(node);
}
}
break;
}
case ManageNode::REPLACE: {
break;
}
case ManageNode::REMOVE: {
break;
}
=======
bool Postoffice::Process(Message* msg) {
if (!msg->task.request()) manager_.AddResponse(msg);
// process this message
Expand All @@ -126,6 +179,7 @@ bool Postoffice::Process(Message* msg) {
int id = msg->task.customer_id();
// let the executor to delete "msg"
manager_.customer(id)->executor()->Accept(msg);
>>>>>>> upstream/master
}
return true;
}
Expand Down
57 changes: 57 additions & 0 deletions src/system/yellow_pages.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include "util/common.h"
#include "system/proto/node.pb.h"
#include "system/van.h"

namespace PS {

class Customer;
typedef shared_ptr<Customer> CustomerPtr;
class NodeGroup;

// maintain inforamations about nodes and customers
class YellowPages {
public:
YellowPages() { }
~YellowPages();
void init() { van_.init(); }

// manage customers
void addCustomer(Customer* obj);
// ask the system to delete the customer
void depositCustomer(const string& name);
void removeCustomer(const string& name);
Customer* customer(const string& name);

void addRelation(const string& child, const string& parent) {
relations_[parent].push_back(child);
}
const std::vector<string>& children(const string& parent) {
return relations_[parent];
}

// manage nodes
void addNode(const Node& node);
void removeNode(const Node& node);

int num_workers() { return num_workers_; }
int num_servers() { return num_servers_; }
std::vector<Node> nodes();

Van& van() { return van_; }

private:
DISALLOW_COPY_AND_ASSIGN(YellowPages);
int num_workers_ = 0;
int num_servers_ = 0;

std::map<NodeID, Node> nodes_;
std::map<string, std::pair<Customer*, bool>> customers_;

// parent vs children
std::unordered_map<string, std::vector<string>> relations_;
Van van_;
};

} // namespace PS