UCP client / server example using different APIs (tag, stream, am) utility.
#include "hello_world_util.h"
#include "ucp_util.h"
#include <ucp/api/ucp.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#define DEFAULT_PORT 13337
#define IP_STRING_LEN 50
#define PORT_STRING_LEN 8
#define TAG 0xCAFE
#define COMM_TYPE_DEFAULT "STREAM"
#define PRINT_INTERVAL 2000
#define DEFAULT_NUM_ITERATIONS 1
#define TEST_AM_ID 0
static long test_string_length = 16;
static long iov_cnt = 1;
static uint16_t server_port = DEFAULT_PORT;
static sa_family_t ai_family = AF_INET;
static int num_iterations = DEFAULT_NUM_ITERATIONS;
static int connection_closed = 1;
typedef enum {
CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(1),
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(2),
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
} send_recv_type_t;
typedef struct ucx_server_ctx {
} ucx_server_ctx_t;
typedef struct test_req {
int complete;
} test_req_t;
static struct {
volatile int complete;
int is_rndv;
void *desc;
void *recv_buf;
} am_data_desc = {0, 0, NULL, NULL};
static void usage(void);
{
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_free(iov[idx].buffer);
}
}
{
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
iov[idx].
length = test_string_length;
iov[idx].
buffer = mem_type_malloc(iov[idx].length);
if (iov[idx].buffer == NULL) {
buffer_free(iov);
return -1;
}
}
return 0;
}
{
int ret = 0;
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
ret = generate_test_string(iov[idx].buffer, iov[idx].length);
if (ret != 0) {
break;
}
}
CHKERR_ACTION(ret != 0, "generate test string", return -1;);
return 0;
}
static void common_cb(void *user_data, const char *type_str)
{
test_req_t *ctx;
if (user_data == NULL) {
fprintf(stderr, "user_data passed to %s mustn't be NULL\n", type_str);
return;
}
ctx = user_data;
ctx->complete = 1;
}
{
common_cb(user_data, "tag_recv_cb");
}
static void stream_recv_cb(
void *request,
ucs_status_t status,
size_t length,
void *user_data)
{
common_cb(user_data, "stream_recv_cb");
}
static void am_recv_cb(
void *request,
ucs_status_t status,
size_t length,
void *user_data)
{
common_cb(user_data, "am_recv_cb");
}
static void send_cb(
void *request,
ucs_status_t status,
void *user_data)
{
common_cb(user_data, "send_cb");
}
{
printf("error handling callback was invoked with status %d (%s)\n",
status, ucs_status_string(status));
connection_closed = 1;
}
void set_sock_addr(const char *address_str, struct sockaddr_storage *saddr)
{
struct sockaddr_in *sa_in;
struct sockaddr_in6 *sa_in6;
memset(saddr, 0, sizeof(*saddr));
switch (ai_family) {
case AF_INET:
sa_in = (struct sockaddr_in*)saddr;
if (address_str != NULL) {
inet_pton(AF_INET, address_str, &sa_in->sin_addr);
} else {
sa_in->sin_addr.s_addr = INADDR_ANY;
}
sa_in->sin_family = AF_INET;
sa_in->sin_port = htons(server_port);
break;
case AF_INET6:
sa_in6 = (struct sockaddr_in6*)saddr;
if (address_str != NULL) {
inet_pton(AF_INET6, address_str, &sa_in6->sin6_addr);
} else {
sa_in6->sin6_addr = in6addr_any;
}
sa_in6->sin6_family = AF_INET6;
sa_in6->sin6_port = htons(server_port);
break;
default:
fprintf(stderr, "Invalid address family");
break;
}
}
const char *address_str,
ucp_ep_h *client_ep)
{
struct sockaddr_storage connect_addr;
set_sock_addr(address_str, &connect_addr);
fprintf(stderr, "failed to connect to %s (%s)\n", address_str,
ucs_status_string(status));
}
return status;
}
{
char *msg = alloca(test_string_length);
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_memcpy(msg, iov[idx].buffer, test_string_length);
printf("%s.\n", msg);
}
}
static
void print_result(
int is_server,
const ucp_dt_iov_t *iov,
int current_iter)
{
if (is_server) {
printf("Server: iteration #%d\n", (current_iter + 1));
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
} else {
printf("Client: iteration #%d\n", (current_iter + 1));
printf("\n\n------------------------------\n\n");
}
print_iov(iov);
printf("\n\n------------------------------\n\n");
}
test_req_t *ctx)
{
if (request == NULL) {
}
if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}
while (ctx->complete == 0) {
}
return status;
}
static int request_finalize(
ucp_worker_h ucp_worker, test_req_t *request,
int current_iter)
{
int ret = 0;
status = request_wait(ucp_worker, request, ctx);
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send", ucs_status_string(status));
ret = -1;
goto release_iov;
}
if ((current_iter == 0) || (current_iter == (num_iterations - 1)) ||
!((current_iter + 1) % (PRINT_INTERVAL))) {
print_result(is_server, iov, current_iter);
}
release_iov:
buffer_free(iov);
return ret;
}
static int
void **msg, size_t *msg_length,
{
CHKERR_ACTION(buffer_malloc(iov) != 0, "allocate memory", return -1;);
if (is_client && (fill_buffer(iov) != 0)) {
buffer_free(iov);
return -1;
}
*msg = (iov_cnt == 1) ? iov[0].buffer : iov;
*msg_length = (iov_cnt == 1) ? iov[0].length : iov_cnt;
ctx->complete = 0;
return 0;
}
int current_iter)
{
test_req_t *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, ¶m) != 0) {
return -1;
}
if (!is_server) {
} else {
param.
cb.recv_stream = stream_recv_cb;
&msg_length, ¶m);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
int current_iter)
{
void *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, ¶m) != 0) {
return -1;
}
if (!is_server) {
} else {
param.
cb.recv = tag_recv_cb;
¶m);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
ucs_status_t ucp_am_data_cb(
void *arg,
const void *header,
size_t header_length,
void *data, size_t length,
{
size_t idx;
size_t offset;
if (length != iov_cnt * test_string_length) {
fprintf(stderr, "received wrong data length %ld (expected %ld)",
length, iov_cnt * test_string_length);
}
if (header_length != 0) {
fprintf(stderr, "received unexpected header, length %ld", header_length);
}
am_data_desc.complete++;
am_data_desc.is_rndv = 1;
am_data_desc.desc = data;
}
am_data_desc.is_rndv = 0;
iov = am_data_desc.recv_buf;
offset = 0;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_memcpy(iov[idx].buffer, UCS_PTR_BYTE_OFFSET(data, offset),
iov[idx].length);
}
}
int current_iter)
{
static int last = 0;
test_req_t *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, ¶ms) != 0) {
return -1;
}
if (is_server) {
am_data_desc.recv_buf = iov;
while (last == am_data_desc.complete) {
}
last++;
if (am_data_desc.is_rndv) {
params.
cb.recv_am = am_recv_cb;
am_data_desc.desc,
msg, msg_length,
¶ms);
} else {
request = NULL;
}
} else {
msg_length, ¶ms);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
static void usage()
{
fprintf(stderr, "Usage: ucp_client_server [parameters]\n");
fprintf(stderr, "UCP client-server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -a Set IP address of the server "
"(required for client and should not be specified "
"for the server)\n");
fprintf(stderr, " -l Set IP address where server listens "
"(If not specified, server uses INADDR_ANY; "
"Irrelevant at client)\n");
fprintf(stderr, " -p Port number to listen/connect to (default = %d). "
"0 on the server side means select a random port and print it\n",
DEFAULT_PORT);
fprintf(stderr, " -c Communication type for the client and server. "
" Valid values are:\n"
" 'stream' : Stream API\n"
" 'tag' : Tag API\n"
" 'am' : AM API\n"
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
fprintf(stderr, " -i Number of iterations to run. Client and server must "
"have the same value. (default = %d).\n",
num_iterations);
fprintf(stderr, " -v Number of buffers in a single data "
"transfer function call. (default = %ld).\n",
iov_cnt);
print_common_help();
fprintf(stderr, "\n");
}
static int parse_cmd(int argc, char *const argv[], char **server_addr,
char **listen_addr, send_recv_type_t *send_recv_type)
{
int c = 0;
int port;
while ((c = getopt(argc, argv, "a:l:p:c:6i:s:v:m:h")) != -1) {
switch (c) {
case 'a':
*server_addr = optarg;
break;
case 'c':
if (!strcasecmp(optarg, "stream")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
} else if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_AM;
} else {
fprintf(stderr, "Wrong communication type %s. "
"Using %s as default\n", optarg, COMM_TYPE_DEFAULT);
*send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
}
break;
case 'l':
*listen_addr = optarg;
break;
case 'p':
port = atoi(optarg);
if ((port < 0) || (port > UINT16_MAX)) {
fprintf(stderr, "Wrong server port number %d\n", port);
return -1;
}
server_port = port;
break;
case '6':
ai_family = AF_INET6;
break;
case 'i':
num_iterations = atoi(optarg);
break;
case 's':
test_string_length = atol(optarg);
if (test_string_length < 0) {
fprintf(stderr, "Wrong string size %ld\n", test_string_length);
}
break;
case 'v':
iov_cnt = atol(optarg);
if (iov_cnt <= 0) {
fprintf(stderr, "Wrong iov count %ld\n", iov_cnt);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
}
break;
case 'h':
default:
usage();
return -1;
}
}
return 0;
}
static char* sockaddr_get_ip_str(const struct sockaddr_storage *sock_addr,
char *ip_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
inet_ntop(AF_INET, &addr_in.sin_addr, ip_str, max_size);
return ip_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
inet_ntop(AF_INET6, &addr_in6.sin6_addr, ip_str, max_size);
return ip_str;
default:
return "Invalid address family";
}
}
static char* sockaddr_get_port_str(const struct sockaddr_storage *sock_addr,
char *port_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
snprintf(port_str, max_size, "%d", ntohs(addr_in.sin_port));
return port_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
snprintf(port_str, max_size, "%d", ntohs(addr_in6.sin6_port));
return port_str;
default:
return "Invalid address family";
}
}
send_recv_type_t send_recv_type,
int is_server, int current_iter)
{
int ret;
switch (send_recv_type) {
case CLIENT_SERVER_SEND_RECV_STREAM:
ret = send_recv_stream(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_TAG:
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_AM:
ret = send_recv_am(worker, ep, is_server, current_iter);
break;
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
}
return ret;
}
{
int ret = 0;
memset(&worker_params, 0, sizeof(worker_params));
fprintf(stderr, "failed to ucp_worker_create (%s)\n", ucs_status_string(status));
ret = -1;
}
return ret;
}
{
ucx_server_ctx_t *context = arg;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
printf("Server received a connection request from client at address %s:%s\n",
sockaddr_get_port_str(&attr.
client_address, port_str,
sizeof(port_str)));
fprintf(stderr, "failed to query the connection request (%s)\n",
ucs_status_string(status));
}
if (context->conn_request == NULL) {
context->conn_request = conn_request;
} else {
printf("Rejecting a connection request. "
"Only one client at a time is supported.\n");
fprintf(stderr, "server failed to reject a connection request: (%s)\n",
ucs_status_string(status));
}
}
}
{
fprintf(stderr, "failed to create an endpoint on the server: (%s)\n",
ucs_status_string(status));
}
return status;
}
start_server(
ucp_worker_h ucp_worker, ucx_server_ctx_t *context,
{
struct sockaddr_storage listen_addr;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
set_sock_addr(address_str, &listen_addr);
params.sockaddr.addr = (const struct sockaddr*)&listen_addr;
params.sockaddr.addrlen = sizeof(listen_addr);
params.conn_handler.cb = server_conn_handle_cb;
params.conn_handler.arg = context;
fprintf(stderr, "failed to listen (%s)\n", ucs_status_string(status));
goto out;
}
fprintf(stderr, "failed to query the listener (%s)\n",
ucs_status_string(status));
goto out;
}
fprintf(stderr, "server is listening on IP %s port %s\n",
sockaddr_get_ip_str(&attr.
sockaddr, ip_str, IP_STRING_LEN),
sockaddr_get_port_str(&attr.
sockaddr, port_str, PORT_STRING_LEN));
printf("Waiting for connection...\n");
out:
return status;
}
{
param.
cb = ucp_am_data_cb;
}
send_recv_type_t send_recv_type, int is_server)
{
int i, ret = 0;
connection_closed = 0;
for (i = 0; i < num_iterations; i++) {
ret = client_server_communication(ucp_worker, ep, send_recv_type,
is_server, i);
if (ret != 0) {
fprintf(stderr, "%s failed on iteration #%d\n",
(is_server ? "server": "client"), i + 1);
goto out;
}
}
if (!is_server && (send_recv_type == CLIENT_SERVER_SEND_RECV_AM)) {
status = register_am_recv_callback(ucp_worker);
ret = -1;
goto out;
}
}
ret = client_server_communication(ucp_worker, ep, send_recv_type,
!is_server, i + 1);
if (ret != 0) {
fprintf(stderr, "%s failed on FIN message\n",
(is_server ? "server": "client"));
goto out;
}
printf("%s FIN message\n", is_server ? "sent" : "received");
while (is_server && !connection_closed) {
}
out:
return ret;
}
char *listen_addr, send_recv_type_t send_recv_type)
{
ucx_server_ctx_t context;
int ret;
ret = init_worker(ucp_context, &ucp_data_worker);
if (ret != 0) {
goto err;
}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) {
status = register_am_recv_callback(ucp_data_worker);
ret = -1;
goto err_worker;
}
}
context.conn_request = NULL;
status = start_server(ucp_worker, &context, &context.listener, listen_addr);
ret = -1;
goto err_worker;
}
while (1) {
while (context.conn_request == NULL) {
}
status = server_create_ep(ucp_data_worker, context.conn_request,
&server_ep);
ret = -1;
goto err_listener;
}
ret = client_server_do_work(ucp_data_worker, server_ep, send_recv_type,
1);
if (ret != 0) {
goto err_ep;
}
context.conn_request = NULL;
printf("Waiting for connection...\n");
}
err_ep:
err_listener:
err_worker:
err:
return ret;
}
static int run_client(
ucp_worker_h ucp_worker,
char *server_addr,
send_recv_type_t send_recv_type)
{
int ret;
status = start_client(ucp_worker, server_addr, &client_ep);
fprintf(stderr, "failed to start client (%s)\n", ucs_status_string(status));
ret = -1;
goto out;
}
ret = client_server_do_work(ucp_worker, client_ep, send_recv_type, 0);
out:
return ret;
}
send_recv_type_t send_recv_type)
{
int ret = 0;
if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) {
} else if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) {
} else {
}
fprintf(stderr, "failed to ucp_init (%s)\n", ucs_status_string(status));
ret = -1;
goto err;
}
ret = init_worker(*ucp_context, ucp_worker);
if (ret != 0) {
goto err_cleanup;
}
return ret;
err_cleanup:
err:
return ret;
}
int main(int argc, char **argv)
{
send_recv_type_t send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
char *server_addr = NULL;
char *listen_addr = NULL;
int ret;
ret = parse_cmd(argc, argv, &server_addr, &listen_addr, &send_recv_type);
if (ret != 0) {
goto err;
}
ret = init_context(&ucp_context, &ucp_worker, send_recv_type);
if (ret != 0) {
goto err;
}
if (server_addr == NULL) {
ret = run_server(ucp_context, ucp_worker, listen_addr, send_recv_type);
} else {
ret = run_client(ucp_worker, server_addr, send_recv_type);
}
err:
return ret;
}