UCT hello world client / server example utility.
#include "ucx_hello_world.h"
#include <uct/api/uct.h>
#include <assert.h>
#include <ctype.h>
typedef enum {
FUNC_AM_SHORT,
FUNC_AM_BCOPY,
FUNC_AM_ZCOPY
} func_am_t;
typedef struct {
int is_uct_desc;
} recv_desc_t;
typedef struct {
char *server_name;
uint16_t server_port;
func_am_t func_am_type;
const char *dev_name;
const char *tl_name;
long test_strlen;
} cmd_args_t;
typedef struct {
} iface_info_t;
typedef struct {
uint64_t header;
char *payload;
size_t len;
} am_short_args_t;
typedef struct {
char *data;
size_t len;
} am_bcopy_args_t;
typedef struct {
} zcopy_comp_t;
static void* desc_holder = NULL;
static char *func_am_t_str(func_am_t func_am_type)
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return "uct_ep_am_short";
case FUNC_AM_BCOPY:
return "uct_ep_am_bcopy";
case FUNC_AM_ZCOPY:
return "uct_ep_am_zcopy";
}
return NULL;
}
static size_t func_am_max_size(func_am_t func_am_type,
{
switch (func_am_type) {
case FUNC_AM_SHORT:
return attr->
cap.am.max_short;
case FUNC_AM_BCOPY:
return attr->
cap.am.max_bcopy;
case FUNC_AM_ZCOPY:
return attr->
cap.am.max_zcopy;
}
return 0;
}
void am_short_params_pack(char *buf, size_t len, am_short_args_t *args)
{
args->header = *(uint64_t *)buf;
if (len > sizeof(args->header)) {
args->payload = (buf + sizeof(args->header));
args->len = len - sizeof(args->header);
} else {
args->payload = NULL;
args->len = 0;
}
}
const cmd_args_t *cmd_args, char *buf)
{
am_short_args_t send_args;
am_short_params_pack(buf, cmd_args->test_strlen, &send_args);
do {
send_args.len);
return status;
}
size_t am_bcopy_data_pack_cb(void *dest, void *arg)
{
am_bcopy_args_t *bc_args = arg;
memcpy(dest, bc_args->data, bc_args->len);
return bc_args->len;
}
const cmd_args_t *cmd_args, char *buf)
{
am_bcopy_args_t args;
ssize_t len;
args.data = buf;
args.len = cmd_args->test_strlen;
do {
return (len >= 0) ?
UCS_OK : len;
}
{
zcopy_comp_t *comp = (zcopy_comp_t *)self;
assert((comp->uct_comp.count == 0) && (status ==
UCS_OK));
desc_holder = (void *)0xDEADBEEF;
}
const cmd_args_t *cmd_args, char *buf)
{
zcopy_comp_t comp;
iov.
length = cmd_args->test_strlen;
comp.uct_comp.func = zcopy_completion_cb;
comp.uct_comp.count = 1;
comp.md = if_info->pd;
comp.memh = memh;
do {
while (!desc_holder) {
}
}
}
return status;
}
static void print_strings(const char *label, const char *local_str,
const char *remote_str)
{
fprintf(stdout, "\n\n----- UCT TEST SUCCESS ----\n\n");
fprintf(stdout, "[%s] %s sent %s", label, local_str, remote_str);
fprintf(stdout, "\n\n---------------------------\n");
fflush(stdout);
}
static ucs_status_t hello_world(
void *arg,
void *data,
size_t length,
unsigned flags)
{
recv_desc_t *rdesc;
func_am_t func_am_type = *(func_am_t *)arg;
print_strings("callback", func_am_t_str(func_am_type), data);
rdesc = (recv_desc_t *)data - 1;
rdesc->is_uct_desc = 1;
desc_holder = rdesc;
}
rdesc = malloc(sizeof(*rdesc) + length);
rdesc->is_uct_desc = 0;
memcpy(rdesc + 1, data, length);
desc_holder = rdesc;
}
static ucs_status_t init_iface(
char *dev_name,
char *tl_name,
func_am_t func_am_type,
iface_info_t *iface_p)
{
params.
mode.device.tl_name = tl_name;
params.
mode.device.dev_name = dev_name;
CHKERR_JUMP(
UCS_OK != status,
"setup iface_config", error_ret);
status =
uct_iface_open(iface_p->pd, iface_p->worker, ¶ms, config,
&iface_p->iface);
CHKERR_JUMP(
UCS_OK != status,
"open temporary interface", error_ret);
CHKERR_JUMP(
UCS_OK != status,
"query iface", error_iface);
if ((func_am_type == FUNC_AM_SHORT) &&
}
if ((func_am_type == FUNC_AM_BCOPY) &&
}
if ((func_am_type == FUNC_AM_ZCOPY) &&
}
error_iface:
error_ret:
}
static ucs_status_t dev_tl_lookup(
const cmd_args_t *cmd_args,
iface_info_t *iface_p)
{
unsigned num_md_resources;
unsigned num_tl_resources;
int i;
int j;
CHKERR_JUMP(
UCS_OK != status,
"query for memory domain resources", error_ret);
for (i = 0; i < num_md_resources; ++i) {
CHKERR_JUMP(
UCS_OK != status,
"read PD config", release_pd);
status =
uct_md_open(md_resources[i].md_name, md_config, &iface_p->pd);
CHKERR_JUMP(
UCS_OK != status,
"open memory domains", release_pd);
CHKERR_JUMP(
UCS_OK != status,
"query transport resources", close_pd);
for (j = 0; j < num_tl_resources; ++j) {
if (!strcmp(cmd_args->dev_name, tl_resources[j].
dev_name) &&
!strcmp(cmd_args->tl_name, tl_resources[j].
tl_name)) {
status = init_iface(tl_resources[j].dev_name,
tl_resources[j].tl_name,
cmd_args->func_am_type, iface_p);
fprintf(stdout, "Using %s with %s.\n",
tl_resources[j].dev_name,
tl_resources[j].tl_name);
fflush(stdout);
goto release_pd;
}
}
}
}
fprintf(stderr, "No supported (dev/tl) found (%s/%s)\n",
cmd_args->dev_name, cmd_args->tl_name);
release_pd:
error_ret:
return status;
close_pd:
goto release_pd;
}
int print_err_usage()
{
const char func_template[] = " -%c Select \"%s\" function to send the message%s\n";
fprintf(stderr, "Usage: uct_hello_world [parameters]\n");
fprintf(stderr, "UCT hello world client/server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, func_template, 'i', func_am_t_str(FUNC_AM_SHORT), " (default)");
fprintf(stderr, func_template, 'b', func_am_t_str(FUNC_AM_BCOPY), "");
fprintf(stderr, func_template, 'z', func_am_t_str(FUNC_AM_ZCOPY), "");
fprintf(stderr, " -d Select device name\n");
fprintf(stderr, " -t Select transport layer\n");
fprintf(stderr, " -n name Set node name or IP address "
"of the server (required for client and should be ignored "
"for server)\n");
fprintf(stderr, " -p port Set alternative server port (default:13337)\n");
fprintf(stderr, " -s size Set test string length (default:16)\n");
fprintf(stderr, "\n");
}
int parse_cmd(int argc, char * const argv[], cmd_args_t *args)
{
int c = 0, index = 0;
assert(args);
memset(args, 0, sizeof(*args));
args->server_port = 13337;
args->func_am_type = FUNC_AM_SHORT;
args->test_strlen = 16;
opterr = 0;
while ((c = getopt(argc, argv, "ibzd:t:n:p:s:h")) != -1) {
switch (c) {
case 'i':
args->func_am_type = FUNC_AM_SHORT;
break;
case 'b':
args->func_am_type = FUNC_AM_BCOPY;
break;
case 'z':
args->func_am_type = FUNC_AM_ZCOPY;
break;
case 'd':
args->dev_name = optarg;
break;
case 't':
args->tl_name = optarg;
break;
case 'n':
args->server_name = optarg;
break;
case 'p':
args->server_port = atoi(optarg);
if (args->server_port <= 0) {
fprintf(stderr, "Wrong server port number %d\n",
args->server_port);
}
break;
case 's':
args->test_strlen = atol(optarg);
if (args->test_strlen <= 0) {
fprintf(stderr, "Wrong string size %ld\n", args->test_strlen);
}
break;
case '?':
if (optopt == 's') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else if (isprint (optopt)) {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
case 'h':
default:
return print_err_usage();
}
}
fprintf(stderr, "INFO: UCT_HELLO_WORLD AM function = %s server = %s port = %d\n",
func_am_t_str(args->func_am_type), args->server_name,
args->server_port);
for (index = optind; index < argc; index++) {
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[index]);
}
if (args->dev_name == NULL) {
fprintf(stderr, "WARNING: device is not set\n");
return print_err_usage();
}
if (args->tl_name == NULL) {
fprintf(stderr, "WARNING: transport layer is not set\n");
return print_err_usage();
}
}
int sendrecv(int sock, const void *sbuf, size_t slen, void **rbuf)
{
int ret = 0;
size_t rlen = 0;
*rbuf = NULL;
ret = send(sock, &slen, sizeof(slen), 0);
if ((ret < 0) || (ret != sizeof(slen))) {
fprintf(stderr, "failed to send buffer length\n");
return -1;
}
ret = send(sock, sbuf, slen, 0);
if ((ret < 0) || (ret != slen)) {
fprintf(stderr, "failed to send buffer\n");
return -1;
}
ret = recv(sock, &rlen, sizeof(rlen), 0);
if (ret < 0) {
fprintf(stderr, "failed to receive device address length\n");
return -1;
}
*rbuf = calloc(1, rlen);
if (!*rbuf) {
fprintf(stderr, "failed to allocate receive buffer\n");
return -1;
}
ret = recv(sock, *rbuf, rlen, 0);
if (ret < 0) {
fprintf(stderr, "failed to receive device address\n");
return -1;
}
return 0;
}
int main(int argc, char **argv)
{
ucs_async_context_t *async;
cmd_args_t cmd_args;
iface_info_t if_info;
uint8_t id = 0;
int oob_sock = -1;
if (parse_cmd(argc, argv, &cmd_args)) {
goto out;
}
CHKERR_JUMP(
UCS_OK != status,
"init async context", out);
CHKERR_JUMP(
UCS_OK != status,
"create worker", out_cleanup_async);
status = dev_tl_lookup(&cmd_args, &if_info);
CHKERR_JUMP(
UCS_OK != status,
"find supported device and transport",
out_destroy_worker);
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr",
out_destroy_iface);
CHKERR_JUMP(NULL == own_iface, "allocate memory for if addr",
out_free_dev_addrs);
CHKERR_JUMP(
UCS_OK != status,
"get device address", out_free_if_addrs);
if (cmd_args.server_name) {
oob_sock = client_connect(cmd_args.server_name, cmd_args.server_port);
if (oob_sock < 0) {
goto out_free_if_addrs;
}
} else {
oob_sock = server_connect(cmd_args.server_port);
if (oob_sock < 0) {
goto out_free_if_addrs;
}
}
status = sendrecv(oob_sock, own_dev, if_info.attr.device_addr_len,
(void **)&peer_dev);
CHKERR_JUMP(0 != status, "device exchange", out_free_dev_addrs);
CHKERR_JUMP(0 == status, "reach the peer", out_free_if_addrs);
CHKERR_JUMP(
UCS_OK != status,
"get interface address", out_free_if_addrs);
status = sendrecv(oob_sock, own_iface, if_info.attr.iface_addr_len,
(void **)&peer_iface);
CHKERR_JUMP(0 != status, "ifaces exchange", out_free_if_addrs);
}
CHKERR_JUMP(NULL == own_ep, "allocate memory for ep addrs", out_free_if_addrs);
CHKERR_JUMP(
UCS_OK != status,
"create endpoint", out_free_ep_addrs);
CHKERR_JUMP(
UCS_OK != status,
"get endpoint address", out_free_ep);
status = sendrecv(oob_sock, own_ep, if_info.attr.ep_addr_len,
(void **)&peer_ep);
CHKERR_JUMP(0 != status, "EPs exchange", out_free_ep);
barrier(oob_sock);
} else {
}
CHKERR_JUMP(
UCS_OK != status,
"connect endpoint", out_free_ep);
if (cmd_args.test_strlen > func_am_max_size(cmd_args.func_am_type, &if_info.attr)) {
fprintf(stderr, "Test string is too long: %ld, max supported: %lu\n",
cmd_args.test_strlen,
func_am_max_size(cmd_args.func_am_type, &if_info.attr));
goto out_free_ep;
}
&cmd_args.func_am_type, 0);
CHKERR_JUMP(
UCS_OK != status,
"set callback", out_free_ep);
if (cmd_args.server_name) {
char *str = (char *)malloc(cmd_args.test_strlen);
generate_random_string(str, cmd_args.test_strlen);
if (cmd_args.func_am_type == FUNC_AM_SHORT) {
status = do_am_short(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_BCOPY) {
status = do_am_bcopy(&if_info, ep, id, &cmd_args, str);
} else if (cmd_args.func_am_type == FUNC_AM_ZCOPY) {
status = do_am_zcopy(&if_info, ep, id, &cmd_args, str);
}
free(str);
CHKERR_JUMP(
UCS_OK != status,
"send active msg", out_free_ep);
} else {
recv_desc_t *rdesc;
while (!desc_holder) {
}
rdesc = desc_holder;
print_strings("main", func_am_t_str(cmd_args.func_am_type),
(char *)(rdesc + 1));
if (rdesc->is_uct_desc) {
} else {
free(rdesc);
}
}
barrier(oob_sock);
close(oob_sock);
out_free_ep:
out_free_ep_addrs:
free(own_ep);
free(peer_ep);
out_free_if_addrs:
free(own_iface);
free(peer_iface);
out_free_dev_addrs:
free(own_dev);
free(peer_dev);
out_destroy_iface:
out_destroy_worker:
out_cleanup_async:
out:
}