Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 122 additions & 54 deletions protobuf-c-rpc/protobuf-c-rpc-client.c
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#ifdef WIN32
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WinSock2.h>
#include <Ws2tcpip.h>
#include "protobuf-c-rpc-win.h"
#else
#include <sys/un.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#endif
#include <stdarg.h>
#ifndef _WIN32_WCE
#include <fcntl.h>
#include <sys/stat.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "protobuf-c-rpc.h"
#include "protobuf-c-rpc-data-buffer.h"

#ifdef WIN32
// Visual C++ knows about inline and __inline, but Visual C only knows about __inline.
#define inline __inline
#endif

#define protobuf_c_rpc_assert(x) assert(x)

#undef TRUE
Expand Down Expand Up @@ -106,9 +120,14 @@ static void destroy_client_rpc (ProtobufCService *service);
static void
set_fd_nonblocking(int fd)
{
#ifndef WIN32 /*LINUX*/
int flags = fcntl (fd, F_GETFL);
protobuf_c_rpc_assert (flags >= 0);
fcntl (fd, F_SETFL, flags | O_NONBLOCK);
#else
unsigned long flags = 1UL;
ioctlsocket(fd, FIONBIO, &flags);
#endif
}

static void
Expand Down Expand Up @@ -228,14 +247,18 @@ set_state_connected (ProtobufC_RPC_Client *client)
}

static void
handle_client_fd_connect_events (int fd,
unsigned events,
void *callback_data)
handle_client_fd_connect_events (ProtobufC_RPC_FD fd,
unsigned events,
void *callback_data)
{
ProtobufC_RPC_Client *client = callback_data;
socklen_t size_int = sizeof (int);
int fd_errno = EINVAL;
#ifndef WIN32 /*LINUX*/
if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &fd_errno, &size_int) < 0)
#elif defined(WIN32)
if (getsockopt (fd, SOL_SOCKET, SO_ERROR, (char*) &fd_errno, &size_int) < 0)
#endif
{
/* Note: this behavior is vaguely hypothetically broken,
* in terms of ignoring getsockopt's error;
Expand Down Expand Up @@ -284,7 +307,11 @@ begin_connecting (ProtobufC_RPC_Client *client,
set_fd_nonblocking (client->fd);
if (connect (client->fd, address, addr_len) < 0)
{
#ifndef WIN32 /*LINUX*/
if (errno == EINPROGRESS)
#elif defined(WIN32)
if (WSAGetLastError() == WSAEWOULDBLOCK)
#endif
{
/* register interest in fd */
protobuf_c_rpc_dispatch_watch_fd (client->dispatch,
Expand Down Expand Up @@ -349,6 +376,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client)
client->info.name_lookup.pending = 0;
switch (client->address_type)
{
#ifndef WIN32 /*LINUX*/
case PROTOBUF_C_RPC_ADDRESS_LOCAL:
{
struct sockaddr_un addr;
Expand All @@ -358,7 +386,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client)
sizeof (addr));
return;
}

#endif
case PROTOBUF_C_RPC_ADDRESS_TCP:
{
/* parse hostname:port from client->name */
Expand Down Expand Up @@ -454,6 +482,7 @@ enqueue_request (ProtobufC_RPC_Client *client,
{
const ProtobufCServiceDescriptor *desc = client->base_service.descriptor;
const ProtobufCMethodDescriptor *method = desc->methods + method_index;
uint32_t request_id;

protobuf_c_rpc_assert (method_index < desc->n_methods);

Expand All @@ -462,22 +491,23 @@ enqueue_request (ProtobufC_RPC_Client *client,
if (client->info.connected.first_free_request_id == 0)
grow_closure_array (client);

uint32_t request_id = client->info.connected.first_free_request_id;
request_id = client->info.connected.first_free_request_id;

/* Serialize the message */
ProtobufC_RPC_Payload payload = {method_index,
request_id,
(ProtobufCMessage *)input};

client->rpc_protocol.serialize_func (desc, client->allocator,
{
ProtobufC_RPC_Payload payload = {method_index, request_id, (ProtobufCMessage *)input};
client->rpc_protocol.serialize_func (desc, client->allocator,
&client->outgoing.base, payload);
}

/* Add closure to request-tree */
Closure *cl = client->info.connected.closures + (request_id - 1);
client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data);
cl->response_type = method->output;
cl->closure = closure;
cl->closure_data = closure_data;
{
Closure *cl = client->info.connected.closures + (request_id - 1);
client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data);
cl->response_type = method->output;
cl->closure = closure;
cl->closure_data = closure_data;
}
}

static const ProtobufCMessageDescriptor *
Expand All @@ -494,13 +524,14 @@ get_rcvd_message_descriptor (const ProtobufC_RPC_Payload *payload, void *data)
client_failed (client, "bad request-id in response from server");
return NULL;
}

Closure *closure = client->info.connected.closures + (request_id - 1);
return closure->response_type;
{
Closure *closure = client->info.connected.closures + (request_id - 1);
return closure->response_type;
}
}

static void
handle_client_fd_events (int fd,
handle_client_fd_events (ProtobufC_RPC_FD fd,
unsigned events,
void *func_data)
{
Expand Down Expand Up @@ -569,12 +600,14 @@ handle_client_fd_events (int fd,
}

/* invoke closure */
Closure *closure = client->info.connected.closures + (payload.request_id - 1);
closure->closure (payload.message, closure->closure_data);
closure->response_type = NULL;
closure->closure = NULL;
closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
client->info.connected.first_free_request_id = payload.request_id;
{
Closure *closure = client->info.connected.closures + (payload.request_id - 1);
closure->closure (payload.message, closure->closure_data);
closure->response_type = NULL;
closure->closure = NULL;
closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
client->info.connected.first_free_request_id = payload.request_id;
}

/* clean up */
if (payload.message)
Expand Down Expand Up @@ -606,19 +639,22 @@ static ProtobufC_RPC_Protocol_Status client_serialize (const ProtobufCServiceDes
if (!protobuf_c_message_check (payload.message))
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;

size_t message_length = protobuf_c_message_get_packed_size (payload.message);
uint32_t header[3];
header[0] = uint32_to_le (payload.method_index);
header[1] = uint32_to_le (message_length);
header[2] = payload.request_id;
out_buffer->append (out_buffer, sizeof (header), (const uint8_t *) header);

size_t packed_length = protobuf_c_message_pack_to_buffer (payload.message,
{
size_t message_length = protobuf_c_message_get_packed_size (payload.message);
uint32_t header[3];
header[0] = uint32_to_le (payload.method_index);
header[1] = uint32_to_le (message_length);
header[2] = payload.request_id;
out_buffer->append (out_buffer, sizeof (header), (const uint8_t *) header);
{
size_t packed_length = protobuf_c_message_pack_to_buffer (payload.message,
out_buffer);
if (packed_length != message_length)
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;
if (packed_length != message_length)
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;

return PROTOBUF_C_RPC_PROTOCOL_STATUS_SUCCESS;
return PROTOBUF_C_RPC_PROTOCOL_STATUS_SUCCESS;
}
}
}

static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceDescriptor *descriptor,
Expand All @@ -628,18 +664,22 @@ static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceD
ProtobufC_RPC_Get_Descriptor get_descriptor,
void *get_descriptor_data)
{
uint32_t header[4];
uint32_t status_code;
uint32_t message_length;
ProtobufCMessage *msg;

if (!allocator || !in_buffer || !payload)
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;

uint32_t header[4];
if (in_buffer->size < sizeof (header))
return PROTOBUF_C_RPC_PROTOCOL_STATUS_INCOMPLETE_BUFFER;

/* try processing buffer */
protobuf_c_rpc_data_buffer_peek (in_buffer, header, sizeof (header));
uint32_t status_code = uint32_from_le (header[0]);
status_code = uint32_from_le (header[0]);
payload->method_index = uint32_from_le (header[1]);
uint32_t message_length = uint32_from_le (header[2]);
message_length = uint32_from_le (header[2]);
payload->request_id = header[3]; /* already native-endian */

if (sizeof (header) + message_length > in_buffer->size)
Expand All @@ -648,15 +688,16 @@ static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceD
/* Discard the RPC header */
protobuf_c_rpc_data_buffer_discard (in_buffer, sizeof (header));

ProtobufCMessage *msg;
if (status_code == PROTOBUF_C_RPC_STATUS_CODE_SUCCESS)
{
uint8_t *packed_data;

/* read message and unpack */
const ProtobufCMessageDescriptor *desc = get_descriptor (payload, get_descriptor_data);
if (!desc)
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;

uint8_t *packed_data = allocator->alloc (allocator, message_length);
packed_data = allocator->alloc (allocator, message_length);

if (!packed_data && message_length > 0)
return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED;
Expand Down Expand Up @@ -807,7 +848,31 @@ trivial_sync_libc_resolver (ProtobufCRPCDispatch *dispatch,
struct hostent *ent;
ent = gethostbyname (name);
if (ent == NULL)
{
#ifdef WIN32
/* hstrerror isn't available on Windows.
Error mapping taken from http://linux.die.net/man/3/hstrerror */
switch (h_errno) {
case HOST_NOT_FOUND:
failed_func ("The specified host is unknown.", callback_data);
break;
case NO_ADDRESS:
failed_func ("The requested name is valid but does not have an IP address.", callback_data);
break;
case NO_RECOVERY:
failed_func ("A nonrecoverable name server error occurred.", callback_data);
break;
case TRY_AGAIN:
failed_func ("A temporary error occurred on an authoritative name server. Try again later.", callback_data);
break;
default:
failed_func (strerror (h_errno), callback_data);
break;
}
#else
failed_func (hstrerror (h_errno), callback_data);
#endif
}
else
found_func ((const uint8_t *) ent->h_addr_list[0], callback_data);
}
Expand Down Expand Up @@ -836,17 +901,20 @@ ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
rv->error_handler = error_handler;
rv->error_handler_data = "protobuf-c rpc client";
rv->info.init.idle = protobuf_c_rpc_dispatch_add_idle (dispatch, handle_init_idle, rv);
ProtobufC_RPC_Protocol default_rpc_protocol = {client_serialize, client_deserialize};
rv->rpc_protocol = default_rpc_protocol;

size_t name_len = strlen (name);
rv->name = allocator->alloc (allocator, name_len + 1);
if (!rv->name)
return NULL;
strncpy (rv->name, name, name_len);
rv->name[name_len] = '\0';

return &rv->base_service;
{
ProtobufC_RPC_Protocol default_rpc_protocol = {client_serialize, client_deserialize};
size_t name_len;

rv->rpc_protocol = default_rpc_protocol;
name_len = strlen (name);
rv->name = allocator->alloc (allocator, name_len + 1);
if (!rv->name)
return NULL;
strncpy (rv->name, name, name_len);
rv->name[name_len] = '\0';

return &rv->base_service;
}
}

protobuf_c_boolean
Expand Down
Loading