UCX  1.14
Unified Communication X
ucp_hello_world.c

UCP hello world client / server example utility.

#ifndef HAVE_CONFIG_H
# define HAVE_CONFIG_H /* Force using config.h, so test would fail if header
actually tries to use it */
#endif
/*
* UCP hello world client / server example utility
* -----------------------------------------------
*
* Server side:
*
* ./ucp_hello_world
*
* Client side:
*
* ./ucp_hello_world -n <server host name>
*
* Notes:
*
* - Client acquires Server UCX address via TCP socket
*
*
* Author:
*
* Ilya Nelkenbaum <ilya@nelkenbaum.com>
* Sergey Shalnov <sergeysh@mellanox.com> 7-June-2016
*/
#include "hello_world_util.h"
#include "ucp_util.h"
#include <ucp/api/ucp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <assert.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> /* getopt */
#include <pthread.h> /* pthread_self */
#include <errno.h> /* errno */
#include <time.h>
#include <signal.h> /* raise */
struct msg {
uint64_t data_len;
};
struct ucx_context {
int completed;
};
enum ucp_test_mode_t {
TEST_MODE_PROBE,
TEST_MODE_WAIT,
TEST_MODE_EVENTFD
} ucp_test_mode = TEST_MODE_PROBE;
typedef enum {
FAILURE_MODE_NONE,
FAILURE_MODE_SEND, /* fail send operation on server */
FAILURE_MODE_RECV, /* fail receive operation on client */
FAILURE_MODE_KEEPALIVE /* fail without communication on client */
} failure_mode_t;
static struct err_handling {
failure_mode_t failure_mode;
} err_handling_opt;
static ucs_status_t ep_status = UCS_OK;
static uint16_t server_port = 13337;
static sa_family_t ai_family = AF_INET;
static long test_string_length = 16;
static const ucp_tag_t tag = 0x1337a880u;
static const ucp_tag_t tag_mask = UINT64_MAX;
static const char *addr_msg_str = "UCX address message";
static const char *data_msg_str = "UCX data message";
static int print_config = 0;
static ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name);
static void set_msg_data_len(struct msg *msg, uint64_t data_len)
{
mem_type_memcpy(&msg->data_len, &data_len, sizeof(data_len));
}
static void request_init(void *request)
{
struct ucx_context *contex = (struct ucx_context *)request;
contex->completed = 0;
}
static void send_handler(void *request, ucs_status_t status, void *ctx)
{
struct ucx_context *context = (struct ucx_context *)request;
const char *str = (const char *)ctx;
context->completed = 1;
printf("[0x%x] send handler called for \"%s\" with status %d (%s)\n",
(unsigned int)pthread_self(), str, status,
ucs_status_string(status));
}
static void failure_handler(void *arg, ucp_ep_h ep, ucs_status_t status)
{
ucs_status_t *arg_status = (ucs_status_t *)arg;
printf("[0x%x] failure handler called with status %d (%s)\n",
(unsigned int)pthread_self(), status, ucs_status_string(status));
*arg_status = status;
}
static void recv_handler(void *request, ucs_status_t status,
const ucp_tag_recv_info_t *info, void *user_data)
{
struct ucx_context *context = (struct ucx_context *)request;
context->completed = 1;
printf("[0x%x] receive handler called with status %d (%s), length %lu\n",
(unsigned int)pthread_self(), status, ucs_status_string(status),
info->length);
}
static ucs_status_t ucx_wait(ucp_worker_h ucp_worker, struct ucx_context *request,
const char *op_str, const char *data_str)
{
ucs_status_t status;
if (UCS_PTR_IS_ERR(request)) {
status = UCS_PTR_STATUS(request);
} else if (UCS_PTR_IS_PTR(request)) {
while (!request->completed) {
ucp_worker_progress(ucp_worker);
}
request->completed = 0;
status = ucp_request_check_status(request);
ucp_request_free(request);
} else {
status = UCS_OK;
}
if (status != UCS_OK) {
fprintf(stderr, "unable to %s %s (%s)\n", op_str, data_str,
ucs_status_string(status));
} else {
printf("finish to %s %s\n", op_str, data_str);
}
return status;
}
static ucs_status_t test_poll_wait(ucp_worker_h ucp_worker)
{
int err = 0;
int epoll_fd_local = 0;
int epoll_fd = 0;
ucs_status_t status;
struct epoll_event ev;
ev.data.u64 = 0;
status = ucp_worker_get_efd(ucp_worker, &epoll_fd);
CHKERR_JUMP(UCS_OK != status, "ucp_worker_get_efd", err);
/* It is recommended to copy original fd */
epoll_fd_local = epoll_create(1);
ev.data.fd = epoll_fd;
ev.events = EPOLLIN;
err = epoll_ctl(epoll_fd_local, EPOLL_CTL_ADD, epoll_fd, &ev);
CHKERR_JUMP(err < 0, "add original socket to the new epoll\n", err_fd);
/* Need to prepare ucp_worker before epoll_wait */
status = ucp_worker_arm(ucp_worker);
if (status == UCS_ERR_BUSY) { /* some events are arrived already */
ret = UCS_OK;
goto err_fd;
}
CHKERR_JUMP(status != UCS_OK, "ucp_worker_arm\n", err_fd);
do {
err = epoll_wait(epoll_fd_local, &ev, 1, -1);
} while ((err == -1) && (errno == EINTR));
ret = UCS_OK;
err_fd:
close(epoll_fd_local);
err:
return ret;
}
static void ep_close_err_mode(ucp_worker_h ucp_worker, ucp_ep_h ucp_ep)
{
uint64_t ep_close_flags;
if (err_handling_opt.ucp_err_mode == UCP_ERR_HANDLING_MODE_PEER) {
ep_close_flags = UCP_EP_CLOSE_FLAG_FORCE;
} else {
ep_close_flags = 0;
}
ep_close(ucp_worker, ucp_ep, ep_close_flags);
}
static int run_ucx_client(ucp_worker_h ucp_worker,
ucp_address_t *local_addr, size_t local_addr_len,
ucp_address_t *peer_addr, size_t peer_addr_len)
{
struct msg *msg = NULL;
size_t msg_len = 0;
int ret = -1;
ucp_request_param_t send_param, recv_param;
ucs_status_t status;
ucp_ep_h server_ep;
ucp_ep_params_t ep_params;
struct ucx_context *request;
char *str;
/* Send client UCX address to server */
ep_params.address = peer_addr;
ep_params.err_mode = err_handling_opt.ucp_err_mode;
ep_params.err_handler.cb = failure_handler;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
status = ucp_ep_create(ucp_worker, &ep_params, &server_ep);
CHKERR_JUMP(status != UCS_OK, "ucp_ep_create\n", err);
msg_len = sizeof(*msg) + local_addr_len;
msg = malloc(msg_len);
CHKERR_JUMP(msg == NULL, "allocate memory\n", err_ep);
memset(msg, 0, msg_len);
msg->data_len = local_addr_len;
memcpy(msg + 1, local_addr, local_addr_len);
send_param.cb.send = send_handler;
send_param.user_data = (void*)addr_msg_str;
request = ucp_tag_send_nbx(server_ep, msg, msg_len, tag,
&send_param);
status = ucx_wait(ucp_worker, request, "send",
addr_msg_str);
if (status != UCS_OK) {
free(msg);
goto err_ep;
}
free(msg);
if (err_handling_opt.failure_mode == FAILURE_MODE_RECV) {
fprintf(stderr, "Emulating failure before receive operation on client side\n");
raise(SIGKILL);
}
/* Receive test string from server */
for (;;) {
CHKERR_JUMP(ep_status != UCS_OK, "receive data: EP disconnected\n", err_ep);
/* Probing incoming events in non-block mode */
msg_tag = ucp_tag_probe_nb(ucp_worker, tag, tag_mask, 1, &info_tag);
if (msg_tag != NULL) {
/* Message arrived */
break;
} else if (ucp_worker_progress(ucp_worker)) {
/* Some events were polled; try again without going to sleep */
continue;
}
/* If we got here, ucp_worker_progress() returned 0, so we can sleep.
* Following blocked methods used to polling internal file descriptor
* to make CPU idle and don't spin loop
*/
if (ucp_test_mode == TEST_MODE_WAIT) {
/* Polling incoming events*/
status = ucp_worker_wait(ucp_worker);
CHKERR_JUMP(status != UCS_OK, "ucp_worker_wait\n", err_ep);
} else if (ucp_test_mode == TEST_MODE_EVENTFD) {
status = test_poll_wait(ucp_worker);
CHKERR_JUMP(status != UCS_OK, "test_poll_wait\n", err_ep);
}
}
if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) {
fprintf(stderr, "Emulating unexpected failure after receive completion "
"on client side, server should detect error by "
"keepalive mechanism\n");
raise(SIGKILL);
}
msg = mem_type_malloc(info_tag.length);
CHKERR_JUMP(msg == NULL, "allocate memory\n", err_ep);
recv_param.datatype = ucp_dt_make_contig(1);
recv_param.cb.recv = recv_handler;
request = ucp_tag_msg_recv_nbx(ucp_worker, msg, info_tag.length, msg_tag,
&recv_param);
status = ucx_wait(ucp_worker, request, "receive", data_msg_str);
if (status != UCS_OK) {
mem_type_free(msg);
goto err_ep;
}
str = calloc(1, test_string_length);
if (str == NULL) {
fprintf(stderr, "Memory allocation failed\n");
ret = -1;
goto err_msg;
}
mem_type_memcpy(str, msg + 1, test_string_length);
printf("\n\n----- UCP TEST SUCCESS ----\n\n");
printf("%s", str);
printf("\n\n---------------------------\n\n");
free(str);
ret = 0;
err_msg:
mem_type_free(msg);
err_ep:
ep_close_err_mode(ucp_worker, server_ep);
err:
return ret;
}
static ucs_status_t flush_ep(ucp_worker_h worker, ucp_ep_h ep)
{
void *request;
param.op_attr_mask = 0;
request = ucp_ep_flush_nbx(ep, &param);
if (request == NULL) {
return UCS_OK;
} else if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
} else {
ucs_status_t status;
do {
status = ucp_request_check_status(request);
} while (status == UCS_INPROGRESS);
ucp_request_free(request);
return status;
}
}
static int run_ucx_server(ucp_worker_h ucp_worker)
{
struct msg *msg = NULL;
struct ucx_context *request = NULL;
size_t msg_len = 0;
ucp_request_param_t send_param, recv_param;
ucs_status_t status;
ucp_ep_h client_ep;
ucp_ep_params_t ep_params;
ucp_address_t *peer_addr;
size_t peer_addr_len;
int ret;
/* Receive client UCX address */
do {
/* Progressing before probe to update the state */
ucp_worker_progress(ucp_worker);
/* Probing incoming events in non-block mode */
msg_tag = ucp_tag_probe_nb(ucp_worker, tag, tag_mask, 1, &info_tag);
} while (msg_tag == NULL);
msg = malloc(info_tag.length);
CHKERR_ACTION(msg == NULL, "allocate memory\n", ret = -1; goto err);
recv_param.datatype = ucp_dt_make_contig(1);
recv_param.cb.recv = recv_handler;
request = ucp_tag_msg_recv_nbx(ucp_worker, msg, info_tag.length,
msg_tag, &recv_param);
status = ucx_wait(ucp_worker, request, "receive", addr_msg_str);
if (status != UCS_OK) {
free(msg);
ret = -1;
goto err;
}
if (err_handling_opt.failure_mode == FAILURE_MODE_SEND) {
fprintf(stderr, "Emulating unexpected failure on server side, client "
"should detect error by keepalive mechanism\n");
free(msg);
raise(SIGKILL);
exit(1);
}
peer_addr_len = msg->data_len;
peer_addr = malloc(peer_addr_len);
if (peer_addr == NULL) {
fprintf(stderr, "unable to allocate memory for peer address\n");
free(msg);
ret = -1;
goto err;
}
memcpy(peer_addr, msg + 1, peer_addr_len);
free(msg);
/* Send test string to client */
ep_params.address = peer_addr;
ep_params.err_mode = err_handling_opt.ucp_err_mode;
ep_params.err_handler.cb = failure_handler;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
status = ucp_ep_create(ucp_worker, &ep_params, &client_ep);
/* If peer failure testing was requested, it could be possible that UCP EP
* couldn't be created; in this case set `ret = 0` to report success */
ret = (err_handling_opt.failure_mode != FAILURE_MODE_NONE) ? 0 : -1;
CHKERR_ACTION(status != UCS_OK, "ucp_ep_create\n", goto err);
msg_len = sizeof(*msg) + test_string_length;
msg = mem_type_malloc(msg_len);
CHKERR_ACTION(msg == NULL, "allocate memory\n", ret = -1; goto err_ep);
mem_type_memset(msg, 0, msg_len);
set_msg_data_len(msg, msg_len - sizeof(*msg));
ret = generate_test_string((char *)(msg + 1), test_string_length);
CHKERR_JUMP(ret < 0, "generate test string", err_free_mem_type_msg);
if (err_handling_opt.failure_mode == FAILURE_MODE_RECV) {
/* Sleep for small amount of time to ensure that client was killed
* and peer failure handling is covered */
sleep(5);
}
ucp_worker_progress(ucp_worker);
send_param.cb.send = send_handler;
send_param.user_data = (void*)data_msg_str;
send_param.memory_type = test_mem_type;
request = ucp_tag_send_nbx(client_ep, msg, msg_len, tag,
&send_param);
status = ucx_wait(ucp_worker, request, "send",
data_msg_str);
if (status != UCS_OK) {
if (err_handling_opt.failure_mode != FAILURE_MODE_NONE) {
ret = -1;
} else {
/* If peer failure testing was requested, set `ret = 0` to report
* success from the application */
ret = 0;
/* Make sure that failure_handler was called */
while (ep_status == UCS_OK) {
ucp_worker_progress(ucp_worker);
}
}
goto err_free_mem_type_msg;
}
if (err_handling_opt.failure_mode == FAILURE_MODE_KEEPALIVE) {
fprintf(stderr, "Waiting for client is terminated\n");
while (ep_status == UCS_OK) {
ucp_worker_progress(ucp_worker);
}
}
status = flush_ep(ucp_worker, client_ep);
printf("flush_ep completed with status %d (%s)\n",
status, ucs_status_string(status));
ret = 0;
err_free_mem_type_msg:
mem_type_free(msg);
err_ep:
ep_close_err_mode(ucp_worker, client_ep);
err:
return ret;
}
static void progress_worker(void *arg)
{
}
int main(int argc, char **argv)
{
/* UCP temporary vars */
ucp_worker_attr_t worker_attr;
ucp_worker_params_t worker_params;
ucp_config_t *config;
ucs_status_t status;
/* UCP handler objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
/* OOB connection vars */
uint64_t local_addr_len = 0;
ucp_address_t *local_addr = NULL;
uint64_t peer_addr_len = 0;
ucp_address_t *peer_addr = NULL;
char *client_target_name = NULL;
int oob_sock = -1;
int ret = -1;
memset(&ucp_params, 0, sizeof(ucp_params));
memset(&worker_attr, 0, sizeof(worker_attr));
memset(&worker_params, 0, sizeof(worker_params));
/* Parse the command line */
status = parse_cmd(argc, argv, &client_target_name);
CHKERR_JUMP(status != UCS_OK, "parse_cmd\n", err);
/* UCP initialization */
status = ucp_config_read(NULL, NULL, &config);
CHKERR_JUMP(status != UCS_OK, "ucp_config_read\n", err);
ucp_params.features = UCP_FEATURE_TAG;
if (ucp_test_mode == TEST_MODE_WAIT || ucp_test_mode == TEST_MODE_EVENTFD) {
}
ucp_params.request_size = sizeof(struct ucx_context);
ucp_params.request_init = request_init;
status = ucp_init(&ucp_params, config, &ucp_context);
if (print_config) {
ucp_config_print(config, stdout, NULL, UCS_CONFIG_PRINT_CONFIG);
}
CHKERR_JUMP(status != UCS_OK, "ucp_init\n", err);
status = ucp_worker_create(ucp_context, &worker_params, &ucp_worker);
CHKERR_JUMP(status != UCS_OK, "ucp_worker_create\n", err_cleanup);
status = ucp_worker_query(ucp_worker, &worker_attr);
CHKERR_JUMP(status != UCS_OK, "ucp_worker_query\n", err_worker);
local_addr_len = worker_attr.address_length;
local_addr = worker_attr.address;
printf("[0x%x] local address length: %lu\n",
(unsigned int)pthread_self(), local_addr_len);
/* OOB connection establishment */
if (client_target_name != NULL) {
oob_sock = connect_common(client_target_name, server_port, ai_family);
CHKERR_JUMP(oob_sock < 0, "client_connect\n", err_addr);
ret = recv(oob_sock, &peer_addr_len, sizeof(peer_addr_len), MSG_WAITALL);
CHKERR_JUMP_RETVAL(ret != (int)sizeof(peer_addr_len),
"receive address length\n", err_addr, ret);
peer_addr = malloc(peer_addr_len);
CHKERR_JUMP(!peer_addr, "allocate memory\n", err_addr);
ret = recv(oob_sock, peer_addr, peer_addr_len, MSG_WAITALL);
CHKERR_JUMP_RETVAL(ret != (int)peer_addr_len,
"receive address\n", err_peer_addr, ret);
} else {
oob_sock = connect_common(NULL, server_port, ai_family);
CHKERR_JUMP(oob_sock < 0, "server_connect\n", err_peer_addr);
ret = send(oob_sock, &local_addr_len, sizeof(local_addr_len), 0);
CHKERR_JUMP_RETVAL(ret != (int)sizeof(local_addr_len),
"send address length\n", err_peer_addr, ret);
ret = send(oob_sock, local_addr, local_addr_len, 0);
CHKERR_JUMP_RETVAL(ret != (int)local_addr_len, "send address\n",
err_peer_addr, ret);
}
if (client_target_name != NULL) {
ret = run_ucx_client(ucp_worker,
local_addr, local_addr_len,
peer_addr, peer_addr_len);
} else {
ret = run_ucx_server(ucp_worker);
}
if (!ret && (err_handling_opt.failure_mode == FAILURE_MODE_NONE)) {
/* Make sure remote is disconnected before destroying local worker */
ret = barrier(oob_sock, progress_worker, ucp_worker);
}
close(oob_sock);
err_peer_addr:
free(peer_addr);
err_addr:
ucp_worker_release_address(ucp_worker, local_addr);
err_worker:
ucp_worker_destroy(ucp_worker);
err_cleanup:
ucp_cleanup(ucp_context);
err:
return ret;
}
static void print_usage()
{
fprintf(stderr, "Usage: ucp_hello_world [parameters]\n");
fprintf(stderr, "UCP hello world client/server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -w Select test mode \"wait\" to test "
"ucp_worker_wait function\n");
fprintf(stderr, " -f Select test mode \"event fd\" to test "
"ucp_worker_get_efd function with later poll\n");
fprintf(stderr, " -b Select test mode \"busy polling\" to test "
"ucp_tag_probe_nb and ucp_worker_progress (default)\n");
fprintf(stderr, " -n <name> Set node name or IP address "
"of the server (required for client and should be ignored "
"for server)\n");
fprintf(stderr, " -e <type> Emulate unexpected failure and handle an "
"error with enabled UCP_ERR_HANDLING_MODE_PEER\n");
fprintf(stderr, " send - send failure on server side "
"before send initiated\n");
fprintf(stderr, " recv - receive failure on client side "
"before receive completed\n");
fprintf(stderr, " keepalive - keepalive failure on client side "
"after communication completed\n");
fprintf(stderr, " -c Print UCP configuration\n");
print_common_help();
fprintf(stderr, "\n");
}
ucs_status_t parse_cmd(int argc, char * const argv[], char **server_name)
{
int c = 0, idx = 0;
err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_NONE;
err_handling_opt.failure_mode = FAILURE_MODE_NONE;
while ((c = getopt(argc, argv, "wfb6e:n:p:s:m:ch")) != -1) {
switch (c) {
case 'w':
ucp_test_mode = TEST_MODE_WAIT;
break;
case 'f':
ucp_test_mode = TEST_MODE_EVENTFD;
break;
case 'b':
ucp_test_mode = TEST_MODE_PROBE;
break;
case 'e':
err_handling_opt.ucp_err_mode = UCP_ERR_HANDLING_MODE_PEER;
if (!strcmp(optarg, "recv")) {
err_handling_opt.failure_mode = FAILURE_MODE_RECV;
} else if (!strcmp(optarg, "send")) {
err_handling_opt.failure_mode = FAILURE_MODE_SEND;
} else if (!strcmp(optarg, "keepalive")) {
err_handling_opt.failure_mode = FAILURE_MODE_KEEPALIVE;
} else {
print_usage();
}
break;
case 'n':
*server_name = optarg;
break;
case '6':
ai_family = AF_INET6;
break;
case 'p':
server_port = atoi(optarg);
if (server_port <= 0) {
fprintf(stderr, "Wrong server port number %d\n", server_port);
}
break;
case 's':
test_string_length = atol(optarg);
if (test_string_length < 0) {
fprintf(stderr, "Wrong string size %ld\n", test_string_length);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
if (test_mem_type == UCS_MEMORY_TYPE_LAST) {
}
break;
case 'c':
print_config = 1;
break;
case 'h':
default:
print_usage();
}
}
fprintf(stderr, "INFO: UCP_HELLO_WORLD mode = %d server = %s port = %d, pid = %d\n",
ucp_test_mode, *server_name, server_port, getpid());
for (idx = optind; idx < argc; idx++) {
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[idx]);
}
return UCS_OK;
}