UCT hello world client / server example utility.
#include "ucx_hello_world.h"
#include <limits.h>
#include <uct/api/uct.h>
#include <assert.h>
#include <ctype.h>
typedef enum {
FUNC_AM_SHORT,
FUNC_AM_BCOPY,
FUNC_AM_ZCOPY
} func_am_t;
typedef struct {
int is_uct_desc;
} recv_desc_t;
typedef struct {
char *server_name;
uint16_t server_port;
func_am_t func_am_type;
const char *dev_name;
const char *tl_name;
long test_strlen;
} cmd_args_t;
typedef struct {
} iface_info_t;
typedef struct {
uint64_t header;
char *payload;
size_t len;
} am_short_args_t;
typedef struct {
char *data;
size_t len;
} am_bcopy_args_t;
typedef struct {
} zcopy_comp_t;
static void* desc_holder = NULL;
static char *func_am_t_str(func_am_t func_am_type)
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return "uct_ep_am_short";
case FUNC_AM_BCOPY:
return "uct_ep_am_bcopy";
case FUNC_AM_ZCOPY:
return "uct_ep_am_zcopy";
}
return NULL;
}
static size_t func_am_max_size(func_am_t func_am_type,
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return attr->
cap.am.max_short;
case FUNC_AM_BCOPY:
return attr->
cap.am.max_bcopy;
case FUNC_AM_ZCOPY:
return attr->
cap.am.max_zcopy;
}
return 0;
}
void am_short_params_pack(char *buf, size_t len, am_short_args_t *args)
{
args->header = *(uint64_t *)buf;
if (len > sizeof(args->header)) {
args->payload = (buf + sizeof(args->header));
args->len = len - sizeof(args->header);
} else {
args->payload = NULL;
args->len = 0;
}
}
const cmd_args_t *cmd_args, char *buf)
{
am_short_args_t send_args;
am_short_params_pack(buf, cmd_args->test_strlen, &send_args);
do {
send_args.len);
return status;
}
size_t am_bcopy_data_pack_cb(void *dest, void *arg)
{
am_bcopy_args_t *bc_args = arg;
mem_type_memcpy(dest, bc_args->data, bc_args->len);
return bc_args->len;
}
const cmd_args_t *cmd_args, char *buf)
{
am_bcopy_args_t args;
ssize_t len;
args.data = buf;
args.len = cmd_args->test_strlen;
do {
}
{
zcopy_comp_t *comp = (zcopy_comp_t *)self;
assert((comp->uct_comp.count == 0) && (status ==
UCS_OK));
if (comp->memh != UCT_MEM_HANDLE_NULL) {
}
desc_holder = (void *)0xDEADBEEF;
}
const cmd_args_t *cmd_args, char *buf)
{
zcopy_comp_t comp;
} else {
memh = UCT_MEM_HANDLE_NULL;
}
iov.
length = cmd_args->test_strlen;
comp.uct_comp.func = zcopy_completion_cb;
comp.uct_comp.count = 1;
comp.md = if_info->md;
comp.memh = memh;
do {
while (!desc_holder) {
}
}
}
return status;
}
static void print_strings(const char *label, const char *local_str,
const char *remote_str, size_t length)
{
fprintf(stdout, "\n\n----- UCT TEST SUCCESS ----\n\n");
fprintf(stdout, "[%s] %s sent %s", label, local_str, remote_str);
fprintf(stdout, "\n\n---------------------------\n");
fflush(stdout);
}
static ucs_status_t hello_world(
void *arg,
void *data,
size_t length,
unsigned flags)
{
func_am_t func_am_type = *(func_am_t *)arg;
recv_desc_t *rdesc;
print_strings("callback", func_am_t_str(func_am_type), data, length);
rdesc = (recv_desc_t *)data - 1;
rdesc->is_uct_desc = 1;
desc_holder = rdesc;
}
rdesc = malloc(sizeof(*rdesc) + length);
rdesc->is_uct_desc = 0;
memcpy(rdesc + 1, data, length);
desc_holder = rdesc;
}
static ucs_status_t init_iface(
char *dev_name,
char *tl_name,
func_am_t func_am_type,
iface_info_t *iface_p)
{
params.
mode.device.tl_name = tl_name;
params.
mode.device.dev_name = dev_name;
CHKERR_JUMP(
UCS_OK != status,
"setup iface_config", error_ret);
assert(iface_p->iface == NULL);
status =
uct_iface_open(iface_p->md, iface_p->worker, ¶ms, config,
&iface_p->iface);
CHKERR_JUMP(
UCS_OK != status,
"open temporary interface", error_ret);
CHKERR_JUMP(
UCS_OK != status,
"query iface", error_iface);
if ((func_am_type == FUNC_AM_SHORT) &&
if (test_mem_type != UCS_MEMORY_TYPE_CUDA) {
} else {
fprintf(stderr, "AM short protocol doesn't support CUDA memory");
}
}
if ((func_am_type == FUNC_AM_BCOPY) &&
}
if ((func_am_type == FUNC_AM_ZCOPY) &&
}
error_iface:
iface_p->iface = NULL;
error_ret:
}
static ucs_status_t dev_tl_lookup(
const cmd_args_t *cmd_args,
iface_info_t *iface_p)
{
unsigned num_tl_resources = 0;
unsigned num_components;
unsigned cmpt_index;
unsigned md_index;
unsigned tl_index;
CHKERR_JUMP(
UCS_OK != status,
"query for components", error_ret);
for (cmpt_index = 0; cmpt_index < num_components; ++cmpt_index) {
CHKERR_JUMP(
UCS_OK != status,
"query component attributes",
release_component_list);
CHKERR_JUMP(
UCS_OK != status,
"query for memory domain resources",
release_component_list);
iface_p->iface = NULL;
&md_config);
CHKERR_JUMP(
UCS_OK != status,
"read MD config",
release_component_list);
md_config, &iface_p->md);
CHKERR_JUMP(
UCS_OK != status,
"open memory domains",
release_component_list);
CHKERR_JUMP(
UCS_OK != status,
"query iface",
close_md);
&num_tl_resources);
CHKERR_JUMP(
UCS_OK != status,
"query transport resources", close_md);
for (tl_index = 0; tl_index < num_tl_resources; ++tl_index) {
if (!strcmp(cmd_args->dev_name, tl_resources[tl_index].dev_name) &&
!strcmp(cmd_args->tl_name, tl_resources[tl_index].tl_name)) {
if (!(iface_p->md_attr.cap.reg_mem_types & UCS_BIT(test_mem_type))) {
fprintf(stderr, "Unsupported memory type %s by "
UCT_TL_RESOURCE_DESC_FMT" on %s MD\n",
ucs_memory_type_names[test_mem_type],
UCT_TL_RESOURCE_DESC_ARG(&tl_resources[tl_index]),
break;
}
status = init_iface(tl_resources[tl_index].dev_name,
tl_resources[tl_index].tl_name,
cmd_args->func_am_type, iface_p);
break;
}
fprintf(stdout, "Using "UCT_TL_RESOURCE_DESC_FMT"\n",
UCT_TL_RESOURCE_DESC_ARG(&tl_resources[tl_index]));
goto release_tl_resources;
}
}
release_tl_resources:
(tl_index < num_tl_resources)) {
goto release_component_list;
}
tl_resources = NULL;
num_tl_resources = 0;
}
}
fprintf(stderr, "No supported (dev/tl) found (%s/%s)\n",
cmd_args->dev_name, cmd_args->tl_name);
release_component_list:
error_ret:
return status;
close_md:
goto release_component_list;
}
int print_err_usage()
{
const char func_template[] = " -%c Select \"%s\" function to send the message%s\n";
fprintf(stderr, "Usage: uct_hello_world [parameters]\n");
fprintf(stderr, "UCT hello world client/server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, func_template, 'i', func_am_t_str(FUNC_AM_SHORT), " (default)");
fprintf(stderr, func_template, 'b', func_am_t_str(FUNC_AM_BCOPY), "");
fprintf(stderr, func_template, 'z', func_am_t_str(FUNC_AM_ZCOPY), "");
fprintf(stderr, " -d Select device name\n");
fprintf(stderr, " -t Select transport layer\n");
print_common_help();
fprintf(stderr, "\nExample:\n");
fprintf(stderr, " Server: uct_hello_world -d eth0 -t tcp\n");
fprintf(stderr, " Client: uct_hello_world -d eth0 -t tcp -n localhost\n");
}
int parse_cmd(int argc, char * const argv[], cmd_args_t *args)
{
int c = 0, index = 0;
assert(args);
memset(args, 0, sizeof(*args));
args->server_port = 13337;
args->func_am_type = FUNC_AM_SHORT;
args->test_strlen = 16;
opterr = 0;
while ((c = getopt(argc, argv, "ibzd:t:n:p:s:m:h")) != -1) {
switch (c) {
case 'i':
args->func_am_type = FUNC_AM_SHORT;
break;
case 'b':
args->func_am_type = FUNC_AM_BCOPY;
break;
case 'z':
args->func_am_type = FUNC_AM_ZCOPY;
break;
case 'd':
args->dev_name = optarg;
break;
case 't':
args->tl_name = optarg;
break;
case 'n':
args->server_name = optarg;
break;
case 'p':
args->server_port = atoi(optarg);
if (args->server_port <= 0) {
fprintf(stderr, "Wrong server port number %d\n",
args->server_port);
}
break;
case 's':
args->test_strlen = atol(optarg);
if (args->test_strlen <= 0) {
fprintf(stderr, "Wrong string size %ld\n", args->test_strlen);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
if (test_mem_type == UCS_MEMORY_TYPE_LAST) {
}
break;
case '?':
if (optopt == 's') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else if (isprint (optopt)) {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
case 'h':
default:
return print_err_usage();
}
}
fprintf(stderr, "INFO: UCT_HELLO_WORLD AM function = %s server = %s port = %d\n",
func_am_t_str(args->func_am_type), args->server_name,
args->server_port);
for (index = optind; index < argc; index++) {
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[index]);
}
if (args->dev_name == NULL) {
fprintf(stderr, "WARNING: device is not set\n");
return print_err_usage();
}
if (args->tl_name == NULL) {
fprintf(stderr, "WARNING: transport layer is not set\n");
return print_err_usage();
}
}
int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
{
int ret = 0;
size_t rlen = 0;
*rbuf = NULL;
ret = send(sock, &slen, sizeof(slen), 0);
if ((ret < 0) || (ret != sizeof(slen))) {
fprintf(stderr, "failed to send buffer length\n");
return -1;
}
ret = send(sock, sbuf, slen, 0);
if (ret != (int)slen) {
fprintf(stderr, "failed to send buffer, return value %d\n", ret);
return -1;
}
ret = recv(sock, &rlen, sizeof(rlen), MSG_WAITALL);
if ((ret != sizeof(rlen)) || (rlen > (SIZE_MAX / 2))) {
fprintf(stderr,
"failed to receive device address length, return value %d\n",
ret);
return -1;
}
*rbuf = calloc(1, rlen);
if (!*rbuf) {
fprintf(stderr, "failed to allocate receive buffer\n");
return -1;
}
ret = recv(sock, *rbuf, rlen, MSG_WAITALL);
if (ret != (int)rlen) {
fprintf(stderr, "failed to receive device address, return value %d\n",
ret);
return -1;
}
return 0;
}
int main(int argc, char **argv)
{
uint8_t id = 0;
int oob_sock = -1;
ucs_async_context_t *async;
cmd_args_t cmd_args;
iface_info_t if_info;
int res;
if (parse_cmd(argc, argv, &cmd_args)) {
goto out;
}
CHKERR_JUMP(
UCS_OK != status,
"init async context", out);
CHKERR_JUMP(
UCS_OK != status,
"create worker", out_cleanup_async);
status = dev_tl_lookup(&cmd_args, &if_info);
CHKERR_JUMP(
UCS_OK != status,
"find supported device and transport",
out_destroy_worker);
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr",
out_destroy_iface);
CHKERR_JUMP(NULL == own_iface, "allocate memory for if addr",
out_free_dev_addrs);
CHKERR_JUMP(
UCS_OK != status,
"get device address", out_free_if_addrs);
if (cmd_args.server_name) {
oob_sock = client_connect(cmd_args.server_name, cmd_args.server_port);
} else {
oob_sock = server_connect(cmd_args.server_port);
}
CHKERR_ACTION(oob_sock < 0, "OOB connect",
res = sendrecv(oob_sock, own_dev, if_info.iface_attr.device_addr_len,
(void **)&peer_dev);
CHKERR_ACTION(0 != res, "device exchange",
CHKERR_JUMP(0 == status, "reach the peer", out_close_oob_sock);
CHKERR_JUMP(
UCS_OK != status,
"get interface address",
out_close_oob_sock);
status = (
ucs_status_t)sendrecv(oob_sock, own_iface, if_info.iface_attr.iface_addr_len,
(void **)&peer_iface);
CHKERR_JUMP(0 != status, "ifaces exchange", out_close_oob_sock);
}
ep_params.
iface = if_info.iface;
own_ep = (
uct_ep_addr_t*)calloc(1, if_info.iface_attr.ep_addr_len);
CHKERR_ACTION(NULL == own_ep, "allocate memory for ep addrs",
CHKERR_JUMP(
UCS_OK != status,
"create endpoint", out_free_ep_addrs);
CHKERR_JUMP(
UCS_OK != status,
"get endpoint address", out_free_ep);
status = (
ucs_status_t)sendrecv(oob_sock, own_ep, if_info.iface_attr.ep_addr_len,
(void **)&peer_ep);
CHKERR_JUMP(0 != status, "EPs exchange", out_free_ep);
if (barrier(oob_sock)) {
goto out_free_ep;
}
CHKERR_JUMP(
UCS_OK != status,
"create endpoint", out_free_ep_addrs);
} else {
goto out_free_ep_addrs;
}
if (cmd_args.test_strlen > func_am_max_size(cmd_args.func_am_type, &if_info.iface_attr)) {
fprintf(stderr, "Test string is too long: %ld, max supported: %lu\n",
cmd_args.test_strlen,
func_am_max_size(cmd_args.func_am_type, &if_info.iface_attr));
goto out_free_ep;
}
&cmd_args.func_am_type, 0);
CHKERR_JUMP(
UCS_OK != status,
"set callback", out_free_ep);
if (cmd_args.server_name) {
char *str = (char *)mem_type_malloc(cmd_args.test_strlen);
CHKERR_ACTION(str == NULL, "allocate memory",
res = generate_test_string(str, cmd_args.test_strlen);
CHKERR_ACTION(res < 0, "generate test string",
if (cmd_args.func_am_type == FUNC_AM_SHORT) {
status = do_am_short(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_BCOPY) {
status = do_am_bcopy(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_ZCOPY) {
status = do_am_zcopy(&if_info, ep, id, &cmd_args, str);
}
mem_type_free(str);
CHKERR_JUMP(
UCS_OK != status,
"send active msg", out_free_ep);
} else {
recv_desc_t *rdesc;
while (desc_holder == NULL) {
}
rdesc = desc_holder;
print_strings("main", func_am_t_str(cmd_args.func_am_type),
(char *)(rdesc + 1), cmd_args.test_strlen);
if (rdesc->is_uct_desc) {
} else {
free(rdesc);
}
}
if (barrier(oob_sock)) {
}
out_free_ep:
out_free_ep_addrs:
free(own_ep);
free(peer_ep);
out_close_oob_sock:
close(oob_sock);
out_free_if_addrs:
free(own_iface);
free(peer_iface);
out_free_dev_addrs:
free(own_dev);
free(peer_dev);
out_destroy_iface:
out_destroy_worker:
out_cleanup_async:
out:
}