UCP client / server example using different APIs (tag, stream, am) utility.
#include <ucp/api/ucp.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#define TEST_STRING_LEN sizeof(test_message)
#define DEFAULT_PORT 13337
#define IP_STRING_LEN 50
#define PORT_STRING_LEN 8
#define TAG 0xCAFE
#define COMM_TYPE_DEFAULT "STREAM"
#define PRINT_INTERVAL 2000
#define DEFAULT_NUM_ITERATIONS 1
#define TEST_AM_ID 0
const char test_message[] = "UCX Client-Server Hello World";
static uint16_t server_port = DEFAULT_PORT;
static int num_iterations = DEFAULT_NUM_ITERATIONS;
typedef enum {
CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(1),
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(2),
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
} send_recv_type_t;
typedef struct ucx_server_ctx {
} ucx_server_ctx_t;
typedef struct test_req {
int complete;
} test_req_t;
static struct {
volatile int complete;
int is_rndv;
void *desc;
void *recv_buf;
} am_data_desc = {0, 0, NULL, NULL};
static void usage(void);
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void
stream_recv_cb(
void *request,
ucs_status_t status,
size_t length,
void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void am_recv_cb(
void *request,
ucs_status_t status,
size_t length,
void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
static void send_cb(
void *request,
ucs_status_t status,
void *user_data)
{
test_req_t *ctx = user_data;
ctx->complete = 1;
}
{
printf("error handling callback was invoked with status %d (%s)\n",
status, ucs_status_string(status));
}
void set_listen_addr(const char *address_str, struct sockaddr_in *listen_addr)
{
memset(listen_addr, 0, sizeof(struct sockaddr_in));
listen_addr->sin_family = AF_INET;
listen_addr->sin_addr.s_addr = (address_str) ? inet_addr(address_str) : INADDR_ANY;
listen_addr->sin_port = htons(server_port);
}
void set_connect_addr(const char *address_str, struct sockaddr_in *connect_addr)
{
memset(connect_addr, 0, sizeof(struct sockaddr_in));
connect_addr->sin_family = AF_INET;
connect_addr->sin_addr.s_addr = inet_addr(address_str);
connect_addr->sin_port = htons(server_port);
}
{
struct sockaddr_in connect_addr;
set_connect_addr(ip, &connect_addr);
fprintf(stderr, "failed to connect to %s (%s)\n", ip, ucs_status_string(status));
}
return status;
}
static void print_result(int is_server, char *recv_message, int current_iter)
{
if (is_server) {
printf("Server: iteration #%d\n", (current_iter + 1));
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
printf("%s", recv_message);
printf("\n\n------------------------------\n\n");
} else {
printf("Client: iteration #%d\n", (current_iter + 1));
printf("\n\n-----------------------------------------\n\n");
printf("Client sent message: \n%s.\nlength: %ld\n",
test_message, TEST_STRING_LEN);
printf("\n-----------------------------------------\n\n");
}
}
test_req_t *ctx)
{
if (request == NULL) {
}
if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}
while (ctx->complete == 0) {
}
return status;
}
static int request_finalize(
ucp_worker_h ucp_worker, test_req_t *request,
test_req_t *ctx, int is_server,
char *recv_message, int current_iter)
{
int ret = 0;
status = request_wait(ucp_worker, request, ctx);
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send", ucs_status_string(status));
return -1;
}
if ((current_iter == 0) || (current_iter == (num_iterations - 1)) ||
!((current_iter + 1) % (PRINT_INTERVAL))) {
print_result(is_server, recv_message, current_iter);
}
return ret;
}
int current_iter)
{
char recv_message[TEST_STRING_LEN]= "";
test_req_t *request;
size_t length;
test_req_t ctx;
ctx.complete = 0;
if (!is_server) {
¶m);
} else {
param.
cb.recv_stream = stream_recv_cb;
TEST_STRING_LEN,
&length, ¶m);
}
return request_finalize(ucp_worker, request, &ctx, is_server,
recv_message, current_iter);
}
int current_iter)
{
char recv_message[TEST_STRING_LEN]= "";
void *request;
test_req_t ctx;
ctx.complete = 0;
if (!is_server) {
TAG, ¶m);
} else {
param.
cb.recv = tag_recv_cb;
TEST_STRING_LEN, TAG, 0, ¶m);
}
return request_finalize(ucp_worker, request, &ctx, is_server, recv_message,
current_iter);
}
ucs_status_t ucp_am_data_cb(
void *arg,
const void *header,
size_t header_length,
void *data, size_t length,
{
if (length != TEST_STRING_LEN) {
fprintf(stderr, "received wrong data length %ld (expected %ld)",
length, TEST_STRING_LEN);
goto out;
}
if ((header != NULL) || (header_length != 0)) {
fprintf(stderr, "received unexpected header, length %ld", header_length);
}
am_data_desc.is_rndv = 1;
am_data_desc.desc = data;
}
am_data_desc.is_rndv = 0;
memcpy(am_data_desc.recv_buf, data, length);
out:
am_data_desc.complete = 1;
}
int current_iter)
{
char recv_message[TEST_STRING_LEN] = "";
test_req_t *request;
test_req_t ctx;
am_data_desc.recv_buf = recv_message;
ctx.complete = 0;
if (is_server) {
while (!am_data_desc.complete) {
}
am_data_desc.complete = 0;
if (am_data_desc.is_rndv) {
params.
cb.recv_am = am_recv_cb,
am_data_desc.desc,
&recv_message,
TEST_STRING_LEN,
¶ms);
} else {
request = NULL;
}
} else {
test_message, TEST_STRING_LEN,
¶ms);
}
return request_finalize(ucp_worker, request, &ctx, is_server, recv_message,
current_iter);
}
{
void *close_req;
if (UCS_PTR_IS_PTR(close_req)) {
do {
}
else if (UCS_PTR_STATUS(close_req) !=
UCS_OK) {
fprintf(stderr, "failed to close ep %p\n", (void*)ep);
}
}
static void usage()
{
fprintf(stderr, "Usage: ucp_client_server [parameters]\n");
fprintf(stderr, "UCP client-server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -a Set IP address of the server "
"(required for client and should not be specified "
"for the server)\n");
fprintf(stderr, " -l Set IP address where server listens "
"(If not specified, server uses INADDR_ANY; "
"Irrelevant at client)\n");
fprintf(stderr, " -p Port number to listen/connect to (default = %d). "
"0 on the server side means select a random port and print it\n",
DEFAULT_PORT);
fprintf(stderr, " -c Communication type for the client and server. "
" Valid values are:\n"
" 'stream' : Stream API\n"
" 'tag' : Tag API\n"
" 'am' : AM API\n"
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
fprintf(stderr, " -i Number of iterations to run. Client and server must "
"have the same value. (default = %d).\n",
num_iterations);
fprintf(stderr, "\n");
}
static int parse_cmd(int argc, char *const argv[], char **server_addr,
char **listen_addr, send_recv_type_t *send_recv_type)
{
int c = 0;
int port;
opterr = 0;
while ((c = getopt(argc, argv, "a:l:p:c:i:")) != -1) {
switch (c) {
case 'a':
*server_addr = optarg;
break;
case 'c':
if (!strcasecmp(optarg, "stream")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
} else if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
fprintf(stderr, "AM API is not fully supported yet\n");
return -1;
} else {
fprintf(stderr, "Wrong communication type %s. "
"Using %s as default\n", optarg, COMM_TYPE_DEFAULT);
*send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
}
break;
case 'l':
*listen_addr = optarg;
break;
case 'p':
port = atoi(optarg);
if ((port < 0) || (port > UINT16_MAX)) {
fprintf(stderr, "Wrong server port number %d\n", port);
return -1;
}
server_port = port;
break;
case 'i':
num_iterations = atoi(optarg);
break;
default:
usage();
return -1;
}
}
return 0;
}
static char* sockaddr_get_ip_str(const struct sockaddr_storage *sock_addr,
char *ip_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
inet_ntop(AF_INET, &addr_in.sin_addr, ip_str, max_size);
return ip_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
inet_ntop(AF_INET6, &addr_in6.sin6_addr, ip_str, max_size);
return ip_str;
default:
return "Invalid address family";
}
}
static char* sockaddr_get_port_str(const struct sockaddr_storage *sock_addr,
char *port_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
snprintf(port_str, max_size, "%d", ntohs(addr_in.sin_port));
return port_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
snprintf(port_str, max_size, "%d", ntohs(addr_in6.sin6_port));
return port_str;
default:
return "Invalid address family";
}
}
send_recv_type_t send_recv_type,
int is_server, int current_iter)
{
int ret;
switch (send_recv_type) {
case CLIENT_SERVER_SEND_RECV_STREAM:
ret = send_recv_stream(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_TAG:
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_AM:
ret = send_recv_am(worker, ep, is_server, current_iter);
break;
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
}
return ret;
}
{
int ret = 0;
memset(&worker_params, 0, sizeof(worker_params));
fprintf(stderr, "failed to ucp_worker_create (%s)\n", ucs_status_string(status));
ret = -1;
}
return ret;
}
{
ucx_server_ctx_t *context = arg;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
printf("Server received a connection request from client at address %s:%s\n",
sockaddr_get_port_str(&attr.
client_address, port_str,
sizeof(port_str)));
fprintf(stderr, "failed to query the connection request (%s)\n",
ucs_status_string(status));
}
if (context->conn_request == NULL) {
context->conn_request = conn_request;
} else {
printf("Rejecting a connection request. "
"Only one client at a time is supported.\n");
fprintf(stderr, "server failed to reject a connection request: (%s)\n",
ucs_status_string(status));
}
}
}
{
fprintf(stderr, "failed to create an endpoint on the server: (%s)\n",
ucs_status_string(status));
}
return status;
}
ucx_server_ctx_t *context,
{
struct sockaddr_in listen_addr;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
set_listen_addr(ip, &listen_addr);
params.sockaddr.addr = (const struct sockaddr*)&listen_addr;
params.sockaddr.addrlen = sizeof(listen_addr);
params.conn_handler.cb = server_conn_handle_cb;
params.conn_handler.arg = context;
fprintf(stderr, "failed to listen (%s)\n", ucs_status_string(status));
goto out;
}
fprintf(stderr, "failed to query the listener (%s)\n",
ucs_status_string(status));
goto out;
}
fprintf(stderr, "server is listening on IP %s port %s\n",
sockaddr_get_ip_str(&attr.
sockaddr, ip_str, IP_STRING_LEN),
sockaddr_get_port_str(&attr.
sockaddr, port_str, PORT_STRING_LEN));
printf("Waiting for connection...\n");
out:
return status;
}
send_recv_type_t send_recv_type, int is_server)
{
int i, ret = 0;
for (i = 0; i < num_iterations; i++) {
ret = client_server_communication(ucp_worker, ep, send_recv_type,
is_server, i);
if (ret != 0) {
fprintf(stderr, "%s failed on iteration #%d\n",
(is_server ? "server": "client"), i + 1);
goto out;
}
}
out:
return ret;
}
char *listen_addr, send_recv_type_t send_recv_type)
{
ucx_server_ctx_t context;
int ret;
ret = init_worker(ucp_context, &ucp_data_worker);
if (ret != 0) {
goto err;
}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) {
param.
cb = ucp_am_data_cb;
param.
arg = ucp_data_worker;
¶m);
ret = -1;
goto err_worker;
}
}
context.conn_request = NULL;
status = start_server(ucp_worker, &context, &context.listener, listen_addr);
ret = -1;
goto err_worker;
}
while (1) {
while (context.conn_request == NULL) {
}
status = server_create_ep(ucp_data_worker, context.conn_request,
&server_ep);
ret = -1;
goto err_listener;
}
ret = client_server_do_work(ucp_data_worker, server_ep, send_recv_type,
1);
if (ret != 0) {
goto err_ep;
}
ep_close(ucp_data_worker, server_ep);
context.conn_request = NULL;
printf("Waiting for connection...\n");
}
err_ep:
ep_close(ucp_data_worker, server_ep);
err_listener:
err_worker:
err:
return ret;
}
static int run_client(
ucp_worker_h ucp_worker,
char *server_addr,
send_recv_type_t send_recv_type)
{
int ret;
status = start_client(ucp_worker, server_addr, &client_ep);
fprintf(stderr, "failed to start client (%s)\n", ucs_status_string(status));
ret = -1;
goto out;
}
ret = client_server_do_work(ucp_worker, client_ep, send_recv_type, 0);
ep_close(ucp_worker, client_ep);
out:
return ret;
}
send_recv_type_t send_recv_type)
{
int ret = 0;
if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) {
} else if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) {
} else {
}
fprintf(stderr, "failed to ucp_init (%s)\n", ucs_status_string(status));
ret = -1;
goto err;
}
ret = init_worker(*ucp_context, ucp_worker);
if (ret != 0) {
goto err_cleanup;
}
return ret;
err_cleanup:
err:
return ret;
}
int main(int argc, char **argv)
{
send_recv_type_t send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
char *server_addr = NULL;
char *listen_addr = NULL;
int ret;
ret = parse_cmd(argc, argv, &server_addr, &listen_addr, &send_recv_type);
if (ret != 0) {
goto err;
}
ret = init_context(&ucp_context, &ucp_worker, send_recv_type);
if (ret != 0) {
goto err;
}
if (server_addr == NULL) {
ret = run_server(ucp_context, ucp_worker, listen_addr, send_recv_type);
} else {
ret = run_client(ucp_worker, server_addr, send_recv_type);
}
err:
return ret;
}