UCP hello world client / server example utility.
#ifndef HAVE_CONFIG_H
# define HAVE_CONFIG_H
#endif
#include "ucx_hello_world.h"
#include <ucp/api/ucp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <assert.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
struct msg {
uint64_t data_len;
};
struct ucx_context {
int completed;
};
enum ucp_test_mode_t {
TEST_MODE_PROBE,
TEST_MODE_WAIT,
TEST_MODE_EVENTFD
} ucp_test_mode = TEST_MODE_PROBE;
static struct err_handling {
int failure;
} err_handling_opt;
static uint16_t server_port = 13337;
static long test_string_length = 16;
static const ucp_tag_t tag_mask = UINT64_MAX;
static size_t local_addr_len;
static size_t peer_addr_len;
static ucs_status_t parse_cmd(
int argc,
char *
const argv[],
char **server_name);
static void set_msg_data_len(struct msg *msg, uint64_t data_len)
{
mem_type_memcpy(&msg->data_len, &data_len, sizeof(data_len));
}
static void request_init(void *request)
{
struct ucx_context *ctx = (struct ucx_context *) request;
ctx->completed = 0;
}
static void send_handler(
void *request,
ucs_status_t status)
{
struct ucx_context *context = (struct ucx_context *) request;
context->completed = 1;
printf("[0x%x] send handler called with status %d (%s)\n",
(unsigned int)pthread_self(), status, ucs_status_string(status));
}
{
printf("[0x%x] failure handler called with status %d (%s)\n",
(unsigned int)pthread_self(), status, ucs_status_string(status));
*arg_status = status;
}
static void recv_handler(
void *request,
ucs_status_t status,
{
struct ucx_context *context = (struct ucx_context *) request;
context->completed = 1;
printf("[0x%x] receive handler called with status %d (%s), length %lu\n",
(unsigned int)pthread_self(), status, ucs_status_string(status),
}
static void wait(
ucp_worker_h ucp_worker,
struct ucx_context *context)
{
while (context->completed == 0) {
}
}
{
int err = 0;
int epoll_fd_local = 0;
int epoll_fd = 0;
struct epoll_event ev;
ev.data.u64 = 0;
CHKERR_JUMP(
UCS_OK != status,
"ucp_worker_get_efd", err);
epoll_fd_local = epoll_create(1);
ev.data.fd = epoll_fd;
ev.events = EPOLLIN;
err = epoll_ctl(epoll_fd_local, EPOLL_CTL_ADD, epoll_fd, &ev);
CHKERR_JUMP(err < 0, "add original socket to the new epoll\n", err_fd);
goto err_fd;
}
CHKERR_JUMP(status !=
UCS_OK,
"ucp_worker_arm\n", err_fd);
do {
err = epoll_wait(epoll_fd_local, &ev, 1, -1);
} while ((err == -1) && (errno == EINTR));
err_fd:
close(epoll_fd_local);
err:
return ret;
}
{
struct msg *msg = 0;
struct ucx_context *request = 0;
size_t msg_len = 0;
int ret = -1;
char *str;
ep_params.
err_mode = err_handling_opt.ucp_err_mode;
CHKERR_JUMP(status !=
UCS_OK,
"ucp_ep_create\n", err);
msg_len = sizeof(*msg) + local_addr_len;
msg = malloc(msg_len);
CHKERR_JUMP(msg == NULL, "allocate memory\n", err_ep);
memset(msg, 0, msg_len);
msg->data_len = local_addr_len;
memcpy(msg + 1, local_addr, local_addr_len);
send_handler);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to send UCX address message\n");
free(msg);
goto err_ep;
} else if (UCS_PTR_IS_PTR(request)) {
wait(ucp_worker, request);
request->completed = 0;
}
free(msg);
if (err_handling_opt.failure) {
fprintf(stderr, "Emulating unexpected failure on client side\n");
raise(SIGKILL);
}
for (;;) {
if (msg_tag != NULL) {
break;
continue;
}
if (ucp_test_mode == TEST_MODE_WAIT) {
CHKERR_JUMP(status !=
UCS_OK,
"ucp_worker_wait\n", err_ep);
} else if (ucp_test_mode == TEST_MODE_EVENTFD) {
status = test_poll_wait(ucp_worker);
CHKERR_JUMP(status !=
UCS_OK,
"test_poll_wait\n", err_ep);
}
}
msg = mem_type_malloc(info_tag.
length);
CHKERR_JUMP(msg == NULL, "allocate memory\n", err_ep);
recv_handler);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to receive UCX data message (%u)\n",
UCS_PTR_STATUS(request));
free(msg);
goto err_ep;
} else {
assert(UCS_PTR_IS_PTR(request));
wait(ucp_worker, request);
request->completed = 0;
printf("UCX data message was received\n");
}
str = calloc(1, test_string_length);
if (str != NULL) {
mem_type_memcpy(str, msg + 1, test_string_length);
printf("\n\n----- UCP TEST SUCCESS ----\n\n");
printf("%s", str);
printf("\n\n---------------------------\n\n");
free(str);
} else {
fprintf(stderr, "Memory allocation failed\n");
goto err_ep;
}
mem_type_free(msg);
ret = 0;
err_ep:
err:
return ret;
}
static void flush_callback(
void *request,
ucs_status_t status)
{
}
{
void *request;
if (request == NULL) {
} else if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
} else {
do {
return status;
}
}
{
struct msg *msg = 0;
struct ucx_context *request = 0;
size_t msg_len = 0;
int ret;
do {
} while (msg_tag == NULL);
msg = malloc(info_tag.
length);
CHKERR_ACTION(msg == NULL, "allocate memory\n", ret = -1; goto err);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to receive UCX address message (%s)\n",
ucs_status_string(UCS_PTR_STATUS(request)));
free(msg);
ret = -1;
goto err;
} else {
assert(UCS_PTR_IS_PTR(request));
wait(ucp_worker, request);
request->completed = 0;
printf("UCX address message was received\n");
}
peer_addr_len = msg->data_len;
peer_addr = malloc(peer_addr_len);
if (peer_addr == NULL) {
fprintf(stderr, "unable to allocate memory for peer address\n");
free(msg);
ret = -1;
goto err;
}
memcpy(peer_addr, msg + 1, peer_addr_len);
free(msg);
ep_params.
err_mode = err_handling_opt.ucp_err_mode;
CHKERR_ACTION(status !=
UCS_OK,
"ucp_ep_create\n", ret = -1;
goto err);
msg_len = sizeof(*msg) + test_string_length;
msg = mem_type_malloc(msg_len);
CHKERR_ACTION(msg == NULL, "allocate memory\n", ret = -1; goto err_ep);
mem_type_memset(msg, 0, msg_len);
set_msg_data_len(msg, msg_len - sizeof(*msg));
ret = generate_test_string((char *)(msg + 1), test_string_length);
CHKERR_JUMP(ret < 0, "generate test string", err_free_mem_type_msg);
send_handler);
if (UCS_PTR_IS_ERR(request)) {
fprintf(stderr, "unable to send UCX data message\n");
ret = -1;
goto err_free_mem_type_msg;
} else if (UCS_PTR_IS_PTR(request)) {
printf("UCX data message was scheduled for send\n");
wait(ucp_worker, request);
request->completed = 0;
}
status = flush_ep(ucp_worker, client_ep);
printf("flush_ep completed with status %d (%s)\n",
status, ucs_status_string(status));
ret = 0;
err_free_mem_type_msg:
mem_type_free(msg);
err_ep:
err:
return ret;
}
static int run_test(
const char *client_target_name,
ucp_worker_h ucp_worker)
{
if (client_target_name != NULL) {
return run_ucx_client(ucp_worker);
} else {
return run_ucx_server(ucp_worker);
}
}
int main(int argc, char **argv)
{
uint64_t addr_len = 0;
char *client_target_name = NULL;
int oob_sock = -1;
int ret = -1;
memset(&worker_params, 0, sizeof(worker_params));
status = parse_cmd(argc, argv, &client_target_name);
CHKERR_JUMP(status !=
UCS_OK,
"parse_cmd\n", err);
CHKERR_JUMP(status !=
UCS_OK,
"ucp_config_read\n", err);
if (ucp_test_mode == TEST_MODE_WAIT || ucp_test_mode == TEST_MODE_EVENTFD) {
}
CHKERR_JUMP(status !=
UCS_OK,
"ucp_init\n", err);
CHKERR_JUMP(status !=
UCS_OK,
"ucp_worker_create\n", err_cleanup);
CHKERR_JUMP(status !=
UCS_OK,
"ucp_worker_get_address\n", err_worker);
printf("[0x%x] local address length: %lu\n",
(unsigned int)pthread_self(), local_addr_len);
if (client_target_name) {
peer_addr_len = local_addr_len;
oob_sock = client_connect(client_target_name, server_port);
CHKERR_JUMP(oob_sock < 0, "client_connect\n", err_addr);
ret = recv(oob_sock, &addr_len, sizeof(addr_len), MSG_WAITALL);
CHKERR_JUMP_RETVAL(ret != (int)sizeof(addr_len),
"receive address length\n", err_addr, ret);
peer_addr_len = addr_len;
peer_addr = malloc(peer_addr_len);
CHKERR_JUMP(!peer_addr, "allocate memory\n", err_addr);
ret = recv(oob_sock, peer_addr, peer_addr_len, MSG_WAITALL);
CHKERR_JUMP_RETVAL(ret != (int)peer_addr_len,
"receive address\n", err_peer_addr, ret);
} else {
oob_sock = server_connect(server_port);
CHKERR_JUMP(oob_sock < 0, "server_connect\n", err_peer_addr);
addr_len = local_addr_len;
ret = send(oob_sock, &addr_len, sizeof(addr_len), 0);
CHKERR_JUMP_RETVAL(ret != (int)sizeof(addr_len),
"send address length\n", err_peer_addr, ret);
ret = send(oob_sock, local_addr, local_addr_len, 0);
CHKERR_JUMP_RETVAL(ret != (int)local_addr_len, "send address\n",
err_peer_addr, ret);
}
ret = run_test(client_target_name, ucp_worker);
if (!ret && !err_handling_opt.failure) {
ret = barrier(oob_sock);
}
close(oob_sock);
err_peer_addr:
free(peer_addr);
err_addr:
err_worker:
err_cleanup:
err:
return ret;
}
ucs_status_t parse_cmd(
int argc,
char *
const argv[],
char **server_name)
{
int c = 0, index = 0;
opterr = 0;
err_handling_opt.failure = 0;
while ((c = getopt(argc, argv, "wfben:p:s:m:h")) != -1) {
switch (c) {
case 'w':
ucp_test_mode = TEST_MODE_WAIT;
break;
case 'f':
ucp_test_mode = TEST_MODE_EVENTFD;
break;
case 'b':
ucp_test_mode = TEST_MODE_PROBE;
break;
case 'e':
err_handling_opt.failure = 1;
break;
case 'n':
*server_name = optarg;
break;
case 'p':
server_port = atoi(optarg);
if (server_port <= 0) {
fprintf(stderr, "Wrong server port number %d\n", server_port);
}
break;
case 's':
test_string_length = atol(optarg);
if (test_string_length <= 0) {
fprintf(stderr, "Wrong string size %ld\n", test_string_length);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
if (test_mem_type == UCS_MEMORY_TYPE_LAST) {
}
break;
case '?':
if (optopt == 's') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else if (isprint (optopt)) {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
case 'h':
default:
fprintf(stderr, "Usage: ucp_hello_world [parameters]\n");
fprintf(stderr, "UCP hello world client/server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -w Select test mode \"wait\" to test "
"ucp_worker_wait function\n");
fprintf(stderr, " -f Select test mode \"event fd\" to test "
"ucp_worker_get_efd function with later poll\n");
fprintf(stderr, " -b Select test mode \"busy polling\" to test "
"ucp_tag_probe_nb and ucp_worker_progress (default)\n");
fprintf(stderr, " -e Emulate unexpected failure on server side"
"and handle an error on client side with enabled "
"UCP_ERR_HANDLING_MODE_PEER\n");
print_common_help();
fprintf(stderr, "\n");
}
}
fprintf(stderr, "INFO: UCP_HELLO_WORLD mode = %d server = %s port = %d\n",
ucp_test_mode, *server_name, server_port);
for (index = optind; index < argc; index++) {
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[index]);
}
}