UCX  1.10
Unified Communication X
ucp_client_server.c

UCP client / server example using different APIs (tag, stream, am) utility.

/*
* UCP client - server example utility
* -----------------------------------------------
*
* Server side:
*
* ./ucp_client_server
*
* Client side:
*
* ./ucp_client_server -a <server-ip>
*
* Notes:
*
* - The server will listen to incoming connection requests on INADDR_ANY.
* - The client needs to pass the IP address of the server side to connect to
* as an argument to the test.
* - Currently, the passed IP needs to be an IPoIB or a RoCE address.
* - The port which the server side would listen on can be modified with the
* '-p' option and should be used on both sides. The default port to use is
* 13337.
*/
#include <ucp/api/ucp.h>
#include <string.h> /* memset */
#include <arpa/inet.h> /* inet_addr */
#include <unistd.h> /* getopt */
#include <stdlib.h> /* atoi */
#define TEST_STRING_LEN sizeof(test_message)
#define DEFAULT_PORT 13337
#define IP_STRING_LEN 50
#define PORT_STRING_LEN 8
#define TAG 0xCAFE
#define COMM_TYPE_DEFAULT "STREAM"
#define PRINT_INTERVAL 2000
#define DEFAULT_NUM_ITERATIONS 1
#define TEST_AM_ID 0
const char test_message[] = "UCX Client-Server Hello World";
static uint16_t server_port = DEFAULT_PORT;
static int num_iterations = DEFAULT_NUM_ITERATIONS;
typedef enum {
CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(1),
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(2),
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
} send_recv_type_t;
typedef struct ucx_server_ctx {
volatile ucp_conn_request_h conn_request;
ucp_listener_h listener;
} ucx_server_ctx_t;
typedef struct test_req {
int complete;
} test_req_t;
static struct {
volatile int complete;
int is_rndv;
void *desc;
void *recv_buf;
} am_data_desc = {0, 0, NULL, NULL};
static void usage(void);
static void tag_recv_cb(void *request, ucs_status_t status,
const ucp_tag_recv_info_t *info, void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void
stream_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void am_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void send_cb(void *request, ucs_status_t status, void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void err_cb(void *arg, ucp_ep_h ep, ucs_status_t status)
{
printf("error handling callback was invoked with status %d (%s)\n",
status, ucs_status_string(status));
}
void set_listen_addr(const char *address_str, struct sockaddr_in *listen_addr)
{
/* The server will listen on INADDR_ANY */
memset(listen_addr, 0, sizeof(struct sockaddr_in));
listen_addr->sin_family = AF_INET;
listen_addr->sin_addr.s_addr = (address_str) ? inet_addr(address_str) : INADDR_ANY;
listen_addr->sin_port = htons(server_port);
}
void set_connect_addr(const char *address_str, struct sockaddr_in *connect_addr)
{
memset(connect_addr, 0, sizeof(struct sockaddr_in));
connect_addr->sin_family = AF_INET;
connect_addr->sin_addr.s_addr = inet_addr(address_str);
connect_addr->sin_port = htons(server_port);
}
static ucs_status_t start_client(ucp_worker_h ucp_worker, const char *ip,
ucp_ep_h *client_ep)
{
ucp_ep_params_t ep_params;
struct sockaddr_in connect_addr;
ucs_status_t status;
set_connect_addr(ip, &connect_addr);
/*
* Endpoint field mask bits:
* UCP_EP_PARAM_FIELD_FLAGS - Use the value of the 'flags' field.
* UCP_EP_PARAM_FIELD_SOCK_ADDR - Use a remote sockaddr to connect
* to the remote peer.
* UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE - Error handling mode - this flag
* is temporarily required since the
* endpoint will be closed with
* UCP_EP_CLOSE_MODE_FORCE which
* requires this mode.
* Once UCP_EP_CLOSE_MODE_FORCE is
* removed, the error handling mode
* will be removed.
*/
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = NULL;
ep_params.sockaddr.addr = (struct sockaddr*)&connect_addr;
ep_params.sockaddr.addrlen = sizeof(connect_addr);
status = ucp_ep_create(ucp_worker, &ep_params, client_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to connect to %s (%s)\n", ip, ucs_status_string(status));
}
return status;
}
static void print_result(int is_server, char *recv_message, int current_iter)
{
if (is_server) {
printf("Server: iteration #%d\n", (current_iter + 1));
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
printf("%s", recv_message);
printf("\n\n------------------------------\n\n");
} else {
printf("Client: iteration #%d\n", (current_iter + 1));
printf("\n\n-----------------------------------------\n\n");
printf("Client sent message: \n%s.\nlength: %ld\n",
test_message, TEST_STRING_LEN);
printf("\n-----------------------------------------\n\n");
}
}
static ucs_status_t request_wait(ucp_worker_h ucp_worker, void *request,
test_req_t *ctx)
{
ucs_status_t status;
/* if operation was completed immediately */
if (request == NULL) {
return UCS_OK;
}
if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}
while (ctx->complete == 0) {
ucp_worker_progress(ucp_worker);
}
status = ucp_request_check_status(request);
ucp_request_free(request);
return status;
}
static int request_finalize(ucp_worker_h ucp_worker, test_req_t *request,
test_req_t *ctx, int is_server,
char *recv_message, int current_iter)
{
ucs_status_t status;
int ret = 0;
status = request_wait(ucp_worker, request, ctx);
if (status != UCS_OK) {
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send", ucs_status_string(status));
return -1;
}
/* Print the output of the first, last and every PRINT_INTERVAL iteration */
if ((current_iter == 0) || (current_iter == (num_iterations - 1)) ||
!((current_iter + 1) % (PRINT_INTERVAL))) {
print_result(is_server, recv_message, current_iter);
}
return ret;
}
static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
char recv_message[TEST_STRING_LEN]= "";
test_req_t *request;
size_t length;
test_req_t ctx;
ctx.complete = 0;
param.user_data = &ctx;
if (!is_server) {
/* Client sends a message to the server using the stream API */
param.cb.send = send_cb;
request = ucp_stream_send_nbx(ep, test_message, TEST_STRING_LEN,
&param);
} else {
/* Server receives a message from the client using the stream API */
param.cb.recv_stream = stream_recv_cb;
request = ucp_stream_recv_nbx(ep, &recv_message,
TEST_STRING_LEN,
&length, &param);
}
return request_finalize(ucp_worker, request, &ctx, is_server,
recv_message, current_iter);
}
static int send_recv_tag(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
char recv_message[TEST_STRING_LEN]= "";
void *request;
test_req_t ctx;
ctx.complete = 0;
param.user_data = &ctx;
if (!is_server) {
/* Client sends a message to the server using the Tag-Matching API */
param.cb.send = send_cb;
request = ucp_tag_send_nbx(ep, test_message, TEST_STRING_LEN,
TAG, &param);
} else {
/* Server receives a message from the client using the Tag-Matching API */
param.cb.recv = tag_recv_cb;
request = ucp_tag_recv_nbx(ucp_worker, &recv_message,
TEST_STRING_LEN, TAG, 0, &param);
}
return request_finalize(ucp_worker, request, &ctx, is_server, recv_message,
current_iter);
}
ucs_status_t ucp_am_data_cb(void *arg, const void *header, size_t header_length,
void *data, size_t length,
const ucp_am_recv_param_t *param)
{
if (length != TEST_STRING_LEN) {
fprintf(stderr, "received wrong data length %ld (expected %ld)",
length, TEST_STRING_LEN);
goto out;
}
if ((header != NULL) || (header_length != 0)) {
fprintf(stderr, "received unexpected header, length %ld", header_length);
}
/* Rendezvous request arrived, data contains an internal UCX descriptor,
* which has to be passed to ucp_am_recv_data_nbx function to confirm
* data transfer.
*/
am_data_desc.is_rndv = 1;
am_data_desc.desc = data;
}
/* Message delivered with eager protocol, data should be available
* immediately
*/
am_data_desc.is_rndv = 0;
memcpy(am_data_desc.recv_buf, data, length);
out:
am_data_desc.complete = 1;
return UCS_OK;
}
static int send_recv_am(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
char recv_message[TEST_STRING_LEN] = "";
test_req_t *request;
test_req_t ctx;
am_data_desc.recv_buf = recv_message;
ctx.complete = 0;
params.user_data = &ctx;
if (is_server) {
while (!am_data_desc.complete) {
ucp_worker_progress(ucp_worker);
}
am_data_desc.complete = 0;
if (am_data_desc.is_rndv) {
/* Rendezvous request has arrived, need to invoke receive operation
* to confirm data transfer from the sender to the "recv_message"
* buffer. */
params.cb.recv_am = am_recv_cb,
request = ucp_am_recv_data_nbx(ucp_worker,
am_data_desc.desc,
&recv_message,
TEST_STRING_LEN,
&params);
} else {
/* Data has arrived eagerly and is ready for use, no need to
* initiate receive operation. */
request = NULL;
}
} else {
/* Client sends a message to the server using the AM API */
params.cb.send = (ucp_send_nbx_callback_t)send_cb,
request = ucp_am_send_nbx(ep, TEST_AM_ID, NULL, 0ul,
test_message, TEST_STRING_LEN,
&params);
}
return request_finalize(ucp_worker, request, &ctx, is_server, recv_message,
current_iter);
}
static void ep_close(ucp_worker_h ucp_worker, ucp_ep_h ep)
{
ucs_status_t status;
void *close_req;
close_req = ucp_ep_close_nbx(ep, &param);
if (UCS_PTR_IS_PTR(close_req)) {
do {
ucp_worker_progress(ucp_worker);
status = ucp_request_check_status(close_req);
} while (status == UCS_INPROGRESS);
ucp_request_free(close_req);
} else if (UCS_PTR_STATUS(close_req) != UCS_OK) {
fprintf(stderr, "failed to close ep %p\n", (void*)ep);
}
}
static void usage()
{
fprintf(stderr, "Usage: ucp_client_server [parameters]\n");
fprintf(stderr, "UCP client-server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -a Set IP address of the server "
"(required for client and should not be specified "
"for the server)\n");
fprintf(stderr, " -l Set IP address where server listens "
"(If not specified, server uses INADDR_ANY; "
"Irrelevant at client)\n");
fprintf(stderr, " -p Port number to listen/connect to (default = %d). "
"0 on the server side means select a random port and print it\n",
DEFAULT_PORT);
fprintf(stderr, " -c Communication type for the client and server. "
" Valid values are:\n"
" 'stream' : Stream API\n"
" 'tag' : Tag API\n"
" 'am' : AM API\n"
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
fprintf(stderr, " -i Number of iterations to run. Client and server must "
"have the same value. (default = %d).\n",
num_iterations);
fprintf(stderr, "\n");
}
static int parse_cmd(int argc, char *const argv[], char **server_addr,
char **listen_addr, send_recv_type_t *send_recv_type)
{
int c = 0;
int port;
opterr = 0;
while ((c = getopt(argc, argv, "a:l:p:c:i:")) != -1) {
switch (c) {
case 'a':
*server_addr = optarg;
break;
case 'c':
if (!strcasecmp(optarg, "stream")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
} else if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
/* TODO: uncomment below when AM API is fully supported.
* *send_recv_type = CLIENT_SERVER_SEND_RECV_AM; */
fprintf(stderr, "AM API is not fully supported yet\n");
return -1;
} else {
fprintf(stderr, "Wrong communication type %s. "
"Using %s as default\n", optarg, COMM_TYPE_DEFAULT);
*send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
}
break;
case 'l':
*listen_addr = optarg;
break;
case 'p':
port = atoi(optarg);
if ((port < 0) || (port > UINT16_MAX)) {
fprintf(stderr, "Wrong server port number %d\n", port);
return -1;
}
server_port = port;
break;
case 'i':
num_iterations = atoi(optarg);
break;
default:
usage();
return -1;
}
}
return 0;
}
static char* sockaddr_get_ip_str(const struct sockaddr_storage *sock_addr,
char *ip_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
inet_ntop(AF_INET, &addr_in.sin_addr, ip_str, max_size);
return ip_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
inet_ntop(AF_INET6, &addr_in6.sin6_addr, ip_str, max_size);
return ip_str;
default:
return "Invalid address family";
}
}
static char* sockaddr_get_port_str(const struct sockaddr_storage *sock_addr,
char *port_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
snprintf(port_str, max_size, "%d", ntohs(addr_in.sin_port));
return port_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
snprintf(port_str, max_size, "%d", ntohs(addr_in6.sin6_port));
return port_str;
default:
return "Invalid address family";
}
}
static int client_server_communication(ucp_worker_h worker, ucp_ep_h ep,
send_recv_type_t send_recv_type,
int is_server, int current_iter)
{
int ret;
switch (send_recv_type) {
case CLIENT_SERVER_SEND_RECV_STREAM:
/* Client-Server communication via Stream API */
ret = send_recv_stream(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_TAG:
/* Client-Server communication via Tag-Matching API */
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_AM:
/* Client-Server communication via AM API. */
ret = send_recv_am(worker, ep, is_server, current_iter);
break;
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
}
return ret;
}
static int init_worker(ucp_context_h ucp_context, ucp_worker_h *ucp_worker)
{
ucp_worker_params_t worker_params;
ucs_status_t status;
int ret = 0;
memset(&worker_params, 0, sizeof(worker_params));
status = ucp_worker_create(ucp_context, &worker_params, ucp_worker);
if (status != UCS_OK) {
fprintf(stderr, "failed to ucp_worker_create (%s)\n", ucs_status_string(status));
ret = -1;
}
return ret;
}
static void server_conn_handle_cb(ucp_conn_request_h conn_request, void *arg)
{
ucx_server_ctx_t *context = arg;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
ucs_status_t status;
status = ucp_conn_request_query(conn_request, &attr);
if (status == UCS_OK) {
printf("Server received a connection request from client at address %s:%s\n",
sockaddr_get_ip_str(&attr.client_address, ip_str, sizeof(ip_str)),
sockaddr_get_port_str(&attr.client_address, port_str, sizeof(port_str)));
} else if (status != UCS_ERR_UNSUPPORTED) {
fprintf(stderr, "failed to query the connection request (%s)\n",
ucs_status_string(status));
}
if (context->conn_request == NULL) {
context->conn_request = conn_request;
} else {
/* The server is already handling a connection request from a client,
* reject this new one */
printf("Rejecting a connection request. "
"Only one client at a time is supported.\n");
status = ucp_listener_reject(context->listener, conn_request);
if (status != UCS_OK) {
fprintf(stderr, "server failed to reject a connection request: (%s)\n",
ucs_status_string(status));
}
}
}
static ucs_status_t server_create_ep(ucp_worker_h data_worker,
ucp_conn_request_h conn_request,
ucp_ep_h *server_ep)
{
ucp_ep_params_t ep_params;
ucs_status_t status;
/* Server creates an ep to the client on the data worker.
* This is not the worker the listener was created on.
* The client side should have initiated the connection, leading
* to this ep's creation */
ep_params.conn_request = conn_request;
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = NULL;
status = ucp_ep_create(data_worker, &ep_params, server_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to create an endpoint on the server: (%s)\n",
ucs_status_string(status));
}
return status;
}
static ucs_status_t start_server(ucp_worker_h ucp_worker,
ucx_server_ctx_t *context,
ucp_listener_h *listener_p, const char *ip)
{
struct sockaddr_in listen_addr;
ucs_status_t status;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
set_listen_addr(ip, &listen_addr);
params.field_mask = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR |
params.sockaddr.addr = (const struct sockaddr*)&listen_addr;
params.sockaddr.addrlen = sizeof(listen_addr);
params.conn_handler.cb = server_conn_handle_cb;
params.conn_handler.arg = context;
/* Create a listener on the server side to listen on the given address.*/
status = ucp_listener_create(ucp_worker, &params, listener_p);
if (status != UCS_OK) {
fprintf(stderr, "failed to listen (%s)\n", ucs_status_string(status));
goto out;
}
/* Query the created listener to get the port it is listening on. */
status = ucp_listener_query(*listener_p, &attr);
if (status != UCS_OK) {
fprintf(stderr, "failed to query the listener (%s)\n",
ucs_status_string(status));
ucp_listener_destroy(*listener_p);
goto out;
}
fprintf(stderr, "server is listening on IP %s port %s\n",
sockaddr_get_ip_str(&attr.sockaddr, ip_str, IP_STRING_LEN),
sockaddr_get_port_str(&attr.sockaddr, port_str, PORT_STRING_LEN));
printf("Waiting for connection...\n");
out:
return status;
}
static int client_server_do_work(ucp_worker_h ucp_worker, ucp_ep_h ep,
send_recv_type_t send_recv_type, int is_server)
{
int i, ret = 0;
for (i = 0; i < num_iterations; i++) {
ret = client_server_communication(ucp_worker, ep, send_recv_type,
is_server, i);
if (ret != 0) {
fprintf(stderr, "%s failed on iteration #%d\n",
(is_server ? "server": "client"), i + 1);
goto out;
}
}
out:
return ret;
}
static int run_server(ucp_context_h ucp_context, ucp_worker_h ucp_worker,
char *listen_addr, send_recv_type_t send_recv_type)
{
ucx_server_ctx_t context;
ucp_worker_h ucp_data_worker;
ucp_ep_h server_ep;
ucs_status_t status;
int ret;
/* Create a data worker (to be used for data exchange between the server
* and the client after the connection between them was established) */
ret = init_worker(ucp_context, &ucp_data_worker);
if (ret != 0) {
goto err;
}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) {
/* Initialize Active Message data handler */
param.id = TEST_AM_ID;
param.cb = ucp_am_data_cb;
param.arg = ucp_data_worker; /* not used in our callback */
status = ucp_worker_set_am_recv_handler(ucp_data_worker,
&param);
if (status != UCS_OK) {
ret = -1;
goto err_worker;
}
}
/* Initialize the server's context. */
context.conn_request = NULL;
/* Create a listener on the worker created at first. The 'connection
* worker' - used for connection establishment between client and server.
* This listener will stay open for listening to incoming connection
* requests from the client */
status = start_server(ucp_worker, &context, &context.listener, listen_addr);
if (status != UCS_OK) {
ret = -1;
goto err_worker;
}
/* Server is always up listening */
while (1) {
/* Wait for the server to receive a connection request from the client.
* If there are multiple clients for which the server's connection request
* callback is invoked, i.e. several clients are trying to connect in
* parallel, the server will handle only the first one and reject the rest */
while (context.conn_request == NULL) {
ucp_worker_progress(ucp_worker);
}
/* Server creates an ep to the client on the data worker.
* This is not the worker the listener was created on.
* The client side should have initiated the connection, leading
* to this ep's creation */
status = server_create_ep(ucp_data_worker, context.conn_request,
&server_ep);
if (status != UCS_OK) {
ret = -1;
goto err_listener;
}
/* The server waits for all the iterations to complete before moving on
* to the next client */
ret = client_server_do_work(ucp_data_worker, server_ep, send_recv_type,
1);
if (ret != 0) {
goto err_ep;
}
/* Close the endpoint to the client */
ep_close(ucp_data_worker, server_ep);
/* Reinitialize the server's context to be used for the next client */
context.conn_request = NULL;
printf("Waiting for connection...\n");
}
err_ep:
ep_close(ucp_data_worker, server_ep);
err_listener:
ucp_listener_destroy(context.listener);
err_worker:
ucp_worker_destroy(ucp_data_worker);
err:
return ret;
}
static int run_client(ucp_worker_h ucp_worker, char *server_addr,
send_recv_type_t send_recv_type)
{
ucp_ep_h client_ep;
ucs_status_t status;
int ret;
status = start_client(ucp_worker, server_addr, &client_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to start client (%s)\n", ucs_status_string(status));
ret = -1;
goto out;
}
ret = client_server_do_work(ucp_worker, client_ep, send_recv_type, 0);
/* Close the endpoint to the server */
ep_close(ucp_worker, client_ep);
out:
return ret;
}
static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
send_recv_type_t send_recv_type)
{
/* UCP objects */
ucs_status_t status;
int ret = 0;
memset(&ucp_params, 0, sizeof(ucp_params));
/* UCP initialization */
if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) {
} else if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) {
} else {
}
status = ucp_init(&ucp_params, NULL, ucp_context);
if (status != UCS_OK) {
fprintf(stderr, "failed to ucp_init (%s)\n", ucs_status_string(status));
ret = -1;
goto err;
}
ret = init_worker(*ucp_context, ucp_worker);
if (ret != 0) {
goto err_cleanup;
}
return ret;
err_cleanup:
ucp_cleanup(*ucp_context);
err:
return ret;
}
int main(int argc, char **argv)
{
send_recv_type_t send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
char *server_addr = NULL;
char *listen_addr = NULL;
int ret;
/* UCP objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
ret = parse_cmd(argc, argv, &server_addr, &listen_addr, &send_recv_type);
if (ret != 0) {
goto err;
}
/* Initialize the UCX required objects */
ret = init_context(&ucp_context, &ucp_worker, send_recv_type);
if (ret != 0) {
goto err;
}
/* Client-Server initialization */
if (server_addr == NULL) {
/* Server side */
ret = run_server(ucp_context, ucp_worker, listen_addr, send_recv_type);
} else {
/* Client side */
ret = run_client(ucp_worker, server_addr, send_recv_type);
}
ucp_worker_destroy(ucp_worker);
ucp_cleanup(ucp_context);
err:
return ret;
}