UCX  1.8
Unified Communication X
uct_hello_world.c

UCT hello world client / server example utility.

#include "ucx_hello_world.h"
#include <limits.h>
#include <uct/api/uct.h>
#include <assert.h>
#include <ctype.h>
typedef enum {
FUNC_AM_SHORT,
FUNC_AM_BCOPY,
FUNC_AM_ZCOPY
} func_am_t;
typedef struct {
int is_uct_desc;
} recv_desc_t;
typedef struct {
char *server_name;
uint16_t server_port;
func_am_t func_am_type;
const char *dev_name;
const char *tl_name;
long test_strlen;
} cmd_args_t;
typedef struct {
uct_iface_attr_t iface_attr; /* Interface attributes: capabilities and limitations */
uct_iface_h iface; /* Communication interface context */
uct_md_attr_t md_attr; /* Memory domain attributes: capabilities and limitations */
uct_md_h md; /* Memory domain */
uct_worker_h worker; /* Workers represent allocated resources in a communication thread */
} iface_info_t;
/* Helper data type for am_short */
typedef struct {
uint64_t header;
char *payload;
size_t len;
} am_short_args_t;
/* Helper data type for am_bcopy */
typedef struct {
char *data;
size_t len;
} am_bcopy_args_t;
/* Helper data type for am_zcopy */
typedef struct {
uct_completion_t uct_comp;
uct_mem_h memh;
} zcopy_comp_t;
static void* desc_holder = NULL;
static char *func_am_t_str(func_am_t func_am_type)
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return "uct_ep_am_short";
case FUNC_AM_BCOPY:
return "uct_ep_am_bcopy";
case FUNC_AM_ZCOPY:
return "uct_ep_am_zcopy";
}
return NULL;
}
static size_t func_am_max_size(func_am_t func_am_type,
const uct_iface_attr_t *attr)
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return attr->cap.am.max_short;
case FUNC_AM_BCOPY:
return attr->cap.am.max_bcopy;
case FUNC_AM_ZCOPY:
return attr->cap.am.max_zcopy;
}
return 0;
}
/* Helper function for am_short */
void am_short_params_pack(char *buf, size_t len, am_short_args_t *args)
{
args->header = *(uint64_t *)buf;
if (len > sizeof(args->header)) {
args->payload = (buf + sizeof(args->header));
args->len = len - sizeof(args->header);
} else {
args->payload = NULL;
args->len = 0;
}
}
ucs_status_t do_am_short(iface_info_t *if_info, uct_ep_h ep, uint8_t id,
const cmd_args_t *cmd_args, char *buf)
{
ucs_status_t status;
am_short_args_t send_args;
am_short_params_pack(buf, cmd_args->test_strlen, &send_args);
do {
/* Send active message to remote endpoint */
status = uct_ep_am_short(ep, id, send_args.header, send_args.payload,
send_args.len);
uct_worker_progress(if_info->worker);
} while (status == UCS_ERR_NO_RESOURCE);
return status;
}
/* Pack callback for am_bcopy */
size_t am_bcopy_data_pack_cb(void *dest, void *arg)
{
am_bcopy_args_t *bc_args = arg;
mem_type_memcpy(dest, bc_args->data, bc_args->len);
return bc_args->len;
}
ucs_status_t do_am_bcopy(iface_info_t *if_info, uct_ep_h ep, uint8_t id,
const cmd_args_t *cmd_args, char *buf)
{
am_bcopy_args_t args;
ssize_t len;
args.data = buf;
args.len = cmd_args->test_strlen;
/* Send active message to remote endpoint */
do {
len = uct_ep_am_bcopy(ep, id, am_bcopy_data_pack_cb, &args, 0);
uct_worker_progress(if_info->worker);
} while (len == UCS_ERR_NO_RESOURCE);
/* Negative len is an error code */
return (len >= 0) ? UCS_OK : (ucs_status_t)len;
}
/* Completion callback for am_zcopy */
void zcopy_completion_cb(uct_completion_t *self, ucs_status_t status)
{
zcopy_comp_t *comp = (zcopy_comp_t *)self;
assert((comp->uct_comp.count == 0) && (status == UCS_OK));
if (comp->memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(comp->md, comp->memh);
}
desc_holder = (void *)0xDEADBEEF;
}
ucs_status_t do_am_zcopy(iface_info_t *if_info, uct_ep_h ep, uint8_t id,
const cmd_args_t *cmd_args, char *buf)
{
ucs_status_t status = UCS_OK;
uct_mem_h memh;
uct_iov_t iov;
zcopy_comp_t comp;
if (if_info->md_attr.cap.flags & UCT_MD_FLAG_NEED_MEMH) {
status = uct_md_mem_reg(if_info->md, buf, cmd_args->test_strlen,
} else {
memh = UCT_MEM_HANDLE_NULL;
}
iov.buffer = buf;
iov.length = cmd_args->test_strlen;
iov.memh = memh;
iov.stride = 0;
iov.count = 1;
comp.uct_comp.func = zcopy_completion_cb;
comp.uct_comp.count = 1;
comp.md = if_info->md;
comp.memh = memh;
if (status == UCS_OK) {
do {
status = uct_ep_am_zcopy(ep, id, NULL, 0, &iov, 1, 0,
(uct_completion_t *)&comp);
uct_worker_progress(if_info->worker);
} while (status == UCS_ERR_NO_RESOURCE);
if (status == UCS_INPROGRESS) {
while (!desc_holder) {
/* Explicitly progress outstanding active message request */
uct_worker_progress(if_info->worker);
}
status = UCS_OK;
}
}
return status;
}
static void print_strings(const char *label, const char *local_str,
const char *remote_str, size_t length)
{
fprintf(stdout, "\n\n----- UCT TEST SUCCESS ----\n\n");
fprintf(stdout, "[%s] %s sent %s", label, local_str, remote_str);
fprintf(stdout, "\n\n---------------------------\n");
fflush(stdout);
}
/* Callback to handle receive active message */
static ucs_status_t hello_world(void *arg, void *data, size_t length,
unsigned flags)
{
func_am_t func_am_type = *(func_am_t *)arg;
recv_desc_t *rdesc;
print_strings("callback", func_am_t_str(func_am_type), data, length);
if (flags & UCT_CB_PARAM_FLAG_DESC) {
rdesc = (recv_desc_t *)data - 1;
/* Hold descriptor to release later and return UCS_INPROGRESS */
rdesc->is_uct_desc = 1;
desc_holder = rdesc;
}
/* We need to copy-out data and return UCS_OK if want to use the data
* outside the callback */
rdesc = malloc(sizeof(*rdesc) + length);
CHKERR_ACTION(rdesc == NULL, "allocate memory\n", return UCS_ERR_NO_MEMORY);
rdesc->is_uct_desc = 0;
memcpy(rdesc + 1, data, length);
desc_holder = rdesc;
return UCS_OK;
}
/* Init the transport by its name */
static ucs_status_t init_iface(char *dev_name, char *tl_name,
func_am_t func_am_type,
iface_info_t *iface_p)
{
ucs_status_t status;
uct_iface_config_t *config; /* Defines interface configuration options */
params.mode.device.tl_name = tl_name;
params.mode.device.dev_name = dev_name;
params.stats_root = NULL;
params.rx_headroom = sizeof(recv_desc_t);
UCS_CPU_ZERO(&params.cpu_mask);
/* Read transport-specific interface configuration */
status = uct_md_iface_config_read(iface_p->md, tl_name, NULL, NULL, &config);
CHKERR_JUMP(UCS_OK != status, "setup iface_config", error_ret);
/* Open communication interface */
assert(iface_p->iface == NULL);
status = uct_iface_open(iface_p->md, iface_p->worker, &params, config,
&iface_p->iface);
CHKERR_JUMP(UCS_OK != status, "open temporary interface", error_ret);
/* Enable progress on the interface */
uct_iface_progress_enable(iface_p->iface,
/* Get interface attributes */
status = uct_iface_query(iface_p->iface, &iface_p->iface_attr);
CHKERR_JUMP(UCS_OK != status, "query iface", error_iface);
/* Check if current device and transport support required active messages */
if ((func_am_type == FUNC_AM_SHORT) &&
(iface_p->iface_attr.cap.flags & UCT_IFACE_FLAG_AM_SHORT)) {
if (test_mem_type != UCS_MEMORY_TYPE_CUDA) {
return UCS_OK;
} else {
fprintf(stderr, "AM short protocol doesn't support CUDA memory");
}
}
if ((func_am_type == FUNC_AM_BCOPY) &&
(iface_p->iface_attr.cap.flags & UCT_IFACE_FLAG_AM_BCOPY)) {
return UCS_OK;
}
if ((func_am_type == FUNC_AM_ZCOPY) &&
(iface_p->iface_attr.cap.flags & UCT_IFACE_FLAG_AM_ZCOPY)) {
return UCS_OK;
}
error_iface:
uct_iface_close(iface_p->iface);
iface_p->iface = NULL;
error_ret:
}
/* Device and transport to be used are determined by minimum latency */
static ucs_status_t dev_tl_lookup(const cmd_args_t *cmd_args,
iface_info_t *iface_p)
{
uct_tl_resource_desc_t *tl_resources = NULL; /* Communication resource descriptor */
unsigned num_tl_resources = 0; /* Number of transport resources resource objects created */
uct_component_h *components;
unsigned num_components;
unsigned cmpt_index;
uct_component_attr_t component_attr;
unsigned md_index;
unsigned tl_index;
uct_md_config_t *md_config;
ucs_status_t status;
status = uct_query_components(&components, &num_components);
CHKERR_JUMP(UCS_OK != status, "query for components", error_ret);
for (cmpt_index = 0; cmpt_index < num_components; ++cmpt_index) {
status = uct_component_query(components[cmpt_index], &component_attr);
CHKERR_JUMP(UCS_OK != status, "query component attributes",
release_component_list);
component_attr.md_resources = alloca(sizeof(*component_attr.md_resources) *
component_attr.md_resource_count);
status = uct_component_query(components[cmpt_index], &component_attr);
CHKERR_JUMP(UCS_OK != status, "query for memory domain resources",
release_component_list);
iface_p->iface = NULL;
/* Iterate through memory domain resources */
for (md_index = 0; md_index < component_attr.md_resource_count; ++md_index) {
status = uct_md_config_read(components[cmpt_index], NULL, NULL,
&md_config);
CHKERR_JUMP(UCS_OK != status, "read MD config",
release_component_list);
status = uct_md_open(components[cmpt_index],
component_attr.md_resources[md_index].md_name,
md_config, &iface_p->md);
uct_config_release(md_config);
CHKERR_JUMP(UCS_OK != status, "open memory domains",
release_component_list);
status = uct_md_query(iface_p->md, &iface_p->md_attr);
CHKERR_JUMP(UCS_OK != status, "query iface",
close_md);
status = uct_md_query_tl_resources(iface_p->md, &tl_resources,
&num_tl_resources);
CHKERR_JUMP(UCS_OK != status, "query transport resources", close_md);
/* Go through each available transport and find the proper name */
for (tl_index = 0; tl_index < num_tl_resources; ++tl_index) {
if (!strcmp(cmd_args->dev_name, tl_resources[tl_index].dev_name) &&
!strcmp(cmd_args->tl_name, tl_resources[tl_index].tl_name)) {
if (!(iface_p->md_attr.cap.reg_mem_types & UCS_BIT(test_mem_type))) {
fprintf(stderr, "Unsupported memory type %s by "
UCT_TL_RESOURCE_DESC_FMT" on %s MD\n",
ucs_memory_type_names[test_mem_type],
UCT_TL_RESOURCE_DESC_ARG(&tl_resources[tl_index]),
component_attr.md_resources[md_index].md_name);
break;
}
status = init_iface(tl_resources[tl_index].dev_name,
tl_resources[tl_index].tl_name,
cmd_args->func_am_type, iface_p);
if (status != UCS_OK) {
break;
}
fprintf(stdout, "Using "UCT_TL_RESOURCE_DESC_FMT"\n",
UCT_TL_RESOURCE_DESC_ARG(&tl_resources[tl_index]));
goto release_tl_resources;
}
}
release_tl_resources:
if ((status == UCS_OK) &&
(tl_index < num_tl_resources)) {
goto release_component_list;
}
tl_resources = NULL;
num_tl_resources = 0;
uct_md_close(iface_p->md);
}
}
fprintf(stderr, "No supported (dev/tl) found (%s/%s)\n",
cmd_args->dev_name, cmd_args->tl_name);
release_component_list:
error_ret:
return status;
close_md:
uct_md_close(iface_p->md);
goto release_component_list;
}
int print_err_usage()
{
const char func_template[] = " -%c Select \"%s\" function to send the message%s\n";
fprintf(stderr, "Usage: uct_hello_world [parameters]\n");
fprintf(stderr, "UCT hello world client/server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, func_template, 'i', func_am_t_str(FUNC_AM_SHORT), " (default)");
fprintf(stderr, func_template, 'b', func_am_t_str(FUNC_AM_BCOPY), "");
fprintf(stderr, func_template, 'z', func_am_t_str(FUNC_AM_ZCOPY), "");
fprintf(stderr, " -d Select device name\n");
fprintf(stderr, " -t Select transport layer\n");
print_common_help();
fprintf(stderr, "\nExample:\n");
fprintf(stderr, " Server: uct_hello_world -d eth0 -t tcp\n");
fprintf(stderr, " Client: uct_hello_world -d eth0 -t tcp -n localhost\n");
}
int parse_cmd(int argc, char * const argv[], cmd_args_t *args)
{
int c = 0, index = 0;
assert(args);
memset(args, 0, sizeof(*args));
/* Defaults */
args->server_port = 13337;
args->func_am_type = FUNC_AM_SHORT;
args->test_strlen = 16;
opterr = 0;
while ((c = getopt(argc, argv, "ibzd:t:n:p:s:m:h")) != -1) {
switch (c) {
case 'i':
args->func_am_type = FUNC_AM_SHORT;
break;
case 'b':
args->func_am_type = FUNC_AM_BCOPY;
break;
case 'z':
args->func_am_type = FUNC_AM_ZCOPY;
break;
case 'd':
args->dev_name = optarg;
break;
case 't':
args->tl_name = optarg;
break;
case 'n':
args->server_name = optarg;
break;
case 'p':
args->server_port = atoi(optarg);
if (args->server_port <= 0) {
fprintf(stderr, "Wrong server port number %d\n",
args->server_port);
}
break;
case 's':
args->test_strlen = atol(optarg);
if (args->test_strlen <= 0) {
fprintf(stderr, "Wrong string size %ld\n", args->test_strlen);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
if (test_mem_type == UCS_MEMORY_TYPE_LAST) {
}
break;
case '?':
if (optopt == 's') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else if (isprint (optopt)) {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
case 'h':
default:
return print_err_usage();
}
}
fprintf(stderr, "INFO: UCT_HELLO_WORLD AM function = %s server = %s port = %d\n",
func_am_t_str(args->func_am_type), args->server_name,
args->server_port);
for (index = optind; index < argc; index++) {
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[index]);
}
if (args->dev_name == NULL) {
fprintf(stderr, "WARNING: device is not set\n");
return print_err_usage();
}
if (args->tl_name == NULL) {
fprintf(stderr, "WARNING: transport layer is not set\n");
return print_err_usage();
}
return UCS_OK;
}
/* The caller is responsible to free *rbuf */
int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
{
int ret = 0;
size_t rlen = 0;
*rbuf = NULL;
ret = send(sock, &slen, sizeof(slen), 0);
if ((ret < 0) || (ret != sizeof(slen))) {
fprintf(stderr, "failed to send buffer length\n");
return -1;
}
ret = send(sock, sbuf, slen, 0);
if (ret != (int)slen) {
fprintf(stderr, "failed to send buffer, return value %d\n", ret);
return -1;
}
ret = recv(sock, &rlen, sizeof(rlen), MSG_WAITALL);
if ((ret != sizeof(rlen)) || (rlen > (SIZE_MAX / 2))) {
fprintf(stderr,
"failed to receive device address length, return value %d\n",
ret);
return -1;
}
*rbuf = calloc(1, rlen);
if (!*rbuf) {
fprintf(stderr, "failed to allocate receive buffer\n");
return -1;
}
ret = recv(sock, *rbuf, rlen, MSG_WAITALL);
if (ret != (int)rlen) {
fprintf(stderr, "failed to receive device address, return value %d\n",
ret);
return -1;
}
return 0;
}
int main(int argc, char **argv)
{
uct_device_addr_t *peer_dev = NULL;
uct_iface_addr_t *peer_iface = NULL;
uct_ep_addr_t *own_ep = NULL;
uct_ep_addr_t *peer_ep = NULL;
uint8_t id = 0;
int oob_sock = -1; /* OOB connection socket */
ucs_status_t status = UCS_OK; /* status codes for UCS */
uct_iface_addr_t *own_iface;
uct_ep_h ep; /* Remote endpoint */
ucs_async_context_t *async; /* Async event context manages
times and fd notifications */
cmd_args_t cmd_args;
iface_info_t if_info;
uct_ep_params_t ep_params;
int res;
/* Parse the command line */
if (parse_cmd(argc, argv, &cmd_args)) {
goto out;
}
/* Initialize context
* It is better to use different contexts for different workers */
status = ucs_async_context_create(UCS_ASYNC_MODE_THREAD_SPINLOCK, &async);
CHKERR_JUMP(UCS_OK != status, "init async context", out);
/* Create a worker object */
status = uct_worker_create(async, UCS_THREAD_MODE_SINGLE, &if_info.worker);
CHKERR_JUMP(UCS_OK != status, "create worker", out_cleanup_async);
/* Search for the desired transport */
status = dev_tl_lookup(&cmd_args, &if_info);
CHKERR_JUMP(UCS_OK != status, "find supported device and transport",
out_destroy_worker);
own_dev = (uct_device_addr_t*)calloc(1, if_info.iface_attr.device_addr_len);
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr",
out_destroy_iface);
own_iface = (uct_iface_addr_t*)calloc(1, if_info.iface_attr.iface_addr_len);
CHKERR_JUMP(NULL == own_iface, "allocate memory for if addr",
out_free_dev_addrs);
/* Get device address */
status = uct_iface_get_device_address(if_info.iface, own_dev);
CHKERR_JUMP(UCS_OK != status, "get device address", out_free_if_addrs);
if (cmd_args.server_name) {
oob_sock = client_connect(cmd_args.server_name, cmd_args.server_port);
} else {
oob_sock = server_connect(cmd_args.server_port);
}
CHKERR_ACTION(oob_sock < 0, "OOB connect",
status = UCS_ERR_IO_ERROR; goto out_close_oob_sock);
res = sendrecv(oob_sock, own_dev, if_info.iface_attr.device_addr_len,
(void **)&peer_dev);
CHKERR_ACTION(0 != res, "device exchange",
status = UCS_ERR_NO_MESSAGE; goto out_close_oob_sock);
status = (ucs_status_t)uct_iface_is_reachable(if_info.iface, peer_dev, NULL);
CHKERR_JUMP(0 == status, "reach the peer", out_close_oob_sock);
/* Get interface address */
if (if_info.iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
status = uct_iface_get_address(if_info.iface, own_iface);
CHKERR_JUMP(UCS_OK != status, "get interface address",
out_close_oob_sock);
status = (ucs_status_t)sendrecv(oob_sock, own_iface, if_info.iface_attr.iface_addr_len,
(void **)&peer_iface);
CHKERR_JUMP(0 != status, "ifaces exchange", out_close_oob_sock);
}
ep_params.iface = if_info.iface;
if (if_info.iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
own_ep = (uct_ep_addr_t*)calloc(1, if_info.iface_attr.ep_addr_len);
CHKERR_ACTION(NULL == own_ep, "allocate memory for ep addrs",
status = UCS_ERR_NO_MEMORY; goto out_close_oob_sock);
/* Create new endpoint */
status = uct_ep_create(&ep_params, &ep);
CHKERR_JUMP(UCS_OK != status, "create endpoint", out_free_ep_addrs);
/* Get endpoint address */
status = uct_ep_get_address(ep, own_ep);
CHKERR_JUMP(UCS_OK != status, "get endpoint address", out_free_ep);
status = (ucs_status_t)sendrecv(oob_sock, own_ep, if_info.iface_attr.ep_addr_len,
(void **)&peer_ep);
CHKERR_JUMP(0 != status, "EPs exchange", out_free_ep);
/* Connect endpoint to a remote endpoint */
status = uct_ep_connect_to_ep(ep, peer_dev, peer_ep);
if (barrier(oob_sock)) {
status = UCS_ERR_IO_ERROR;
goto out_free_ep;
}
} else if (if_info.iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
/* Create an endpoint which is connected to a remote interface */
ep_params.dev_addr = peer_dev;
ep_params.iface_addr = peer_iface;
status = uct_ep_create(&ep_params, &ep);
CHKERR_JUMP(UCS_OK != status, "create endpoint", out_free_ep_addrs);
} else {
goto out_free_ep_addrs;
}
if (cmd_args.test_strlen > func_am_max_size(cmd_args.func_am_type, &if_info.iface_attr)) {
fprintf(stderr, "Test string is too long: %ld, max supported: %lu\n",
cmd_args.test_strlen,
func_am_max_size(cmd_args.func_am_type, &if_info.iface_attr));
goto out_free_ep;
}
/* Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_free_ep);
if (cmd_args.server_name) {
char *str = (char *)mem_type_malloc(cmd_args.test_strlen);
CHKERR_ACTION(str == NULL, "allocate memory",
status = UCS_ERR_NO_MEMORY; goto out_free_ep);
res = generate_test_string(str, cmd_args.test_strlen);
CHKERR_ACTION(res < 0, "generate test string",
status = UCS_ERR_NO_MEMORY; goto out_free_ep);
/* Send active message to remote endpoint */
if (cmd_args.func_am_type == FUNC_AM_SHORT) {
status = do_am_short(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_BCOPY) {
status = do_am_bcopy(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_ZCOPY) {
status = do_am_zcopy(&if_info, ep, id, &cmd_args, str);
}
mem_type_free(str);
CHKERR_JUMP(UCS_OK != status, "send active msg", out_free_ep);
} else {
recv_desc_t *rdesc;
while (desc_holder == NULL) {
/* Explicitly progress any outstanding active message requests */
uct_worker_progress(if_info.worker);
}
rdesc = desc_holder;
print_strings("main", func_am_t_str(cmd_args.func_am_type),
(char *)(rdesc + 1), cmd_args.test_strlen);
if (rdesc->is_uct_desc) {
/* Release descriptor because callback returns UCS_INPROGRESS */
} else {
free(rdesc);
}
}
if (barrier(oob_sock)) {
status = UCS_ERR_IO_ERROR;
}
out_free_ep:
out_free_ep_addrs:
free(own_ep);
free(peer_ep);
out_close_oob_sock:
close(oob_sock);
out_free_if_addrs:
free(own_iface);
free(peer_iface);
out_free_dev_addrs:
free(own_dev);
free(peer_dev);
out_destroy_iface:
uct_iface_close(if_info.iface);
uct_md_close(if_info.md);
out_destroy_worker:
uct_worker_destroy(if_info.worker);
out_cleanup_async:
out:
return (status == UCS_ERR_UNSUPPORTED) ? UCS_OK : status;
}