UCX 1.17
Unified Communication X
Loading...
Searching...
No Matches
ucp_client_server.c

UCP client / server example using different APIs (tag, stream, am) utility.

/*
* UCP client - server example utility
* -----------------------------------------------
*
* Server side:
*
* ./ucp_client_server
*
* Client side:
*
* ./ucp_client_server -a <server-ip>
*
* Notes:
*
* - The server will listen to incoming connection requests on INADDR_ANY.
* - The client needs to pass the IP address of the server side to connect to
* as an argument to the test.
* - Currently, the passed IP needs to be an IPoIB or a RoCE address.
* - The port which the server side would listen on can be modified with the
* '-p' option and should be used on both sides. The default port to use is
* 13337.
*/
#include "hello_world_util.h"
#include "ucp_util.h"
#include <ucp/api/ucp.h>
#include <string.h> /* memset */
#include <arpa/inet.h> /* inet_addr */
#include <unistd.h> /* getopt */
#include <stdlib.h> /* atoi */
#define DEFAULT_PORT 13337
#define IP_STRING_LEN 50
#define PORT_STRING_LEN 8
#define TAG 0xCAFE
#define COMM_TYPE_DEFAULT "STREAM"
#define PRINT_INTERVAL 2000
#define DEFAULT_NUM_ITERATIONS 1
#define TEST_AM_ID 0
static long test_string_length = 16;
static long iov_cnt = 1;
static uint16_t server_port = DEFAULT_PORT;
static sa_family_t ai_family = AF_INET;
static int num_iterations = DEFAULT_NUM_ITERATIONS;
static int connection_closed = 1;
typedef enum {
CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(1),
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(2),
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
} send_recv_type_t;
typedef struct ucx_server_ctx {
volatile ucp_conn_request_h conn_request;
ucp_listener_h listener;
} ucx_server_ctx_t;
typedef struct test_req {
int complete;
} test_req_t;
static struct {
volatile int complete;
int is_rndv;
void *desc;
void *recv_buf;
} am_data_desc = {0, 0, NULL, NULL};
static void usage(void);
void buffer_free(ucp_dt_iov_t *iov)
{
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_free(iov[idx].buffer);
}
}
int buffer_malloc(ucp_dt_iov_t *iov)
{
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
iov[idx].length = test_string_length;
iov[idx].buffer = mem_type_malloc(iov[idx].length);
if (iov[idx].buffer == NULL) {
buffer_free(iov);
return -1;
}
}
return 0;
}
int fill_buffer(ucp_dt_iov_t *iov)
{
int ret = 0;
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
ret = generate_test_string(iov[idx].buffer, iov[idx].length);
if (ret != 0) {
break;
}
}
CHKERR_ACTION(ret != 0, "generate test string", return -1;);
return 0;
}
static void common_cb(void *user_data, const char *type_str)
{
test_req_t *ctx;
if (user_data == NULL) {
fprintf(stderr, "user_data passed to %s mustn't be NULL\n", type_str);
return;
}
ctx = user_data;
ctx->complete = 1;
}
static void tag_recv_cb(void *request, ucs_status_t status,
const ucp_tag_recv_info_t *info, void *user_data)
{
common_cb(user_data, "tag_recv_cb");
}
static void stream_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
common_cb(user_data, "stream_recv_cb");
}
static void am_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
common_cb(user_data, "am_recv_cb");
}
static void send_cb(void *request, ucs_status_t status, void *user_data)
{
common_cb(user_data, "send_cb");
}
static void err_cb(void *arg, ucp_ep_h ep, ucs_status_t status)
{
printf("error handling callback was invoked with status %d (%s)\n",
status, ucs_status_string(status));
connection_closed = 1;
}
void set_sock_addr(const char *address_str, struct sockaddr_storage *saddr)
{
struct sockaddr_in *sa_in;
struct sockaddr_in6 *sa_in6;
/* The server will listen on INADDR_ANY */
memset(saddr, 0, sizeof(*saddr));
switch (ai_family) {
case AF_INET:
sa_in = (struct sockaddr_in*)saddr;
if (address_str != NULL) {
inet_pton(AF_INET, address_str, &sa_in->sin_addr);
} else {
sa_in->sin_addr.s_addr = INADDR_ANY;
}
sa_in->sin_family = AF_INET;
sa_in->sin_port = htons(server_port);
break;
case AF_INET6:
sa_in6 = (struct sockaddr_in6*)saddr;
if (address_str != NULL) {
inet_pton(AF_INET6, address_str, &sa_in6->sin6_addr);
} else {
sa_in6->sin6_addr = in6addr_any;
}
sa_in6->sin6_family = AF_INET6;
sa_in6->sin6_port = htons(server_port);
break;
default:
fprintf(stderr, "Invalid address family");
break;
}
}
static ucs_status_t start_client(ucp_worker_h ucp_worker,
const char *address_str, ucp_ep_h *client_ep)
{
ucp_ep_params_t ep_params;
struct sockaddr_storage connect_addr;
ucs_status_t status;
set_sock_addr(address_str, &connect_addr);
/*
* Endpoint field mask bits:
* UCP_EP_PARAM_FIELD_FLAGS - Use the value of the 'flags' field.
* UCP_EP_PARAM_FIELD_SOCK_ADDR - Use a remote sockaddr to connect
* to the remote peer.
* UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE - Error handling mode - this flag
* is temporarily required since the
* endpoint will be closed with
* UCP_EP_CLOSE_MODE_FORCE which
* requires this mode.
* Once UCP_EP_CLOSE_MODE_FORCE is
* removed, the error handling mode
* will be removed.
*/
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = NULL;
ep_params.sockaddr.addr = (struct sockaddr*)&connect_addr;
ep_params.sockaddr.addrlen = sizeof(connect_addr);
status = ucp_ep_create(ucp_worker, &ep_params, client_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to connect to %s (%s)\n", address_str,
ucs_status_string(status));
}
return status;
}
static void print_iov(const ucp_dt_iov_t *iov)
{
char *msg = alloca(test_string_length);
size_t idx;
for (idx = 0; idx < iov_cnt; idx++) {
/* In case of Non-System memory */
mem_type_memcpy(msg, iov[idx].buffer, test_string_length);
printf("%s.\n", msg);
}
}
static
void print_result(int is_server, const ucp_dt_iov_t *iov, int current_iter)
{
if (is_server) {
printf("Server: iteration #%d\n", (current_iter + 1));
printf("UCX data message was received\n");
printf("\n\n----- UCP TEST SUCCESS -------\n\n");
} else {
printf("Client: iteration #%d\n", (current_iter + 1));
printf("\n\n------------------------------\n\n");
}
print_iov(iov);
printf("\n\n------------------------------\n\n");
}
static ucs_status_t request_wait(ucp_worker_h ucp_worker, void *request,
test_req_t *ctx)
{
ucs_status_t status;
/* if operation was completed immediately */
if (request == NULL) {
return UCS_OK;
}
if (UCS_PTR_IS_ERR(request)) {
return UCS_PTR_STATUS(request);
}
while (ctx->complete == 0) {
ucp_worker_progress(ucp_worker);
}
status = ucp_request_check_status(request);
ucp_request_free(request);
return status;
}
static int request_finalize(ucp_worker_h ucp_worker, test_req_t *request,
test_req_t *ctx, int is_server, ucp_dt_iov_t *iov,
int current_iter)
{
int ret = 0;
ucs_status_t status;
status = request_wait(ucp_worker, request, ctx);
if (status != UCS_OK) {
fprintf(stderr, "unable to %s UCX message (%s)\n",
is_server ? "receive": "send", ucs_status_string(status));
ret = -1;
goto release_iov;
}
/* Print the output of the first, last and every PRINT_INTERVAL iteration */
if ((current_iter == 0) || (current_iter == (num_iterations - 1)) ||
!((current_iter + 1) % (PRINT_INTERVAL))) {
print_result(is_server, iov, current_iter);
}
release_iov:
buffer_free(iov);
return ret;
}
static int
fill_request_param(ucp_dt_iov_t *iov, int is_client,
void **msg, size_t *msg_length,
test_req_t *ctx, ucp_request_param_t *param)
{
CHKERR_ACTION(buffer_malloc(iov) != 0, "allocate memory", return -1;);
if (is_client && (fill_buffer(iov) != 0)) {
buffer_free(iov);
return -1;
}
*msg = (iov_cnt == 1) ? iov[0].buffer : iov;
*msg_length = (iov_cnt == 1) ? iov[0].length : iov_cnt;
ctx->complete = 0;
param->datatype = (iov_cnt == 1) ? ucp_dt_make_contig(1) :
param->user_data = ctx;
return 0;
}
static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
test_req_t *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, &param) != 0) {
return -1;
}
if (!is_server) {
/* Client sends a message to the server using the stream API */
param.cb.send = send_cb;
request = ucp_stream_send_nbx(ep, msg, msg_length, &param);
} else {
/* Server receives a message from the client using the stream API */
param.cb.recv_stream = stream_recv_cb;
request = ucp_stream_recv_nbx(ep, msg, msg_length,
&msg_length, &param);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
static int send_recv_tag(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
void *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, &param) != 0) {
return -1;
}
if (!is_server) {
/* Client sends a message to the server using the Tag-Matching API */
param.cb.send = send_cb;
request = ucp_tag_send_nbx(ep, msg, msg_length, TAG, &param);
} else {
/* Server receives a message from the client using the Tag-Matching API */
param.cb.recv = tag_recv_cb;
request = ucp_tag_recv_nbx(ucp_worker, msg, msg_length, TAG, 0,
&param);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
ucs_status_t ucp_am_data_cb(void *arg, const void *header, size_t header_length,
void *data, size_t length,
const ucp_am_recv_param_t *param)
{
size_t idx;
size_t offset;
if (length != iov_cnt * test_string_length) {
fprintf(stderr, "received wrong data length %ld (expected %ld)",
length, iov_cnt * test_string_length);
return UCS_OK;
}
if (header_length != 0) {
fprintf(stderr, "received unexpected header, length %ld", header_length);
}
am_data_desc.complete++;
/* Rendezvous request arrived, data contains an internal UCX descriptor,
* which has to be passed to ucp_am_recv_data_nbx function to confirm
* data transfer.
*/
am_data_desc.is_rndv = 1;
am_data_desc.desc = data;
}
/* Message delivered with eager protocol, data should be available
* immediately
*/
am_data_desc.is_rndv = 0;
iov = am_data_desc.recv_buf;
offset = 0;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_memcpy(iov[idx].buffer, UCS_PTR_BYTE_OFFSET(data, offset),
iov[idx].length);
offset += iov[idx].length;
}
return UCS_OK;
}
static int send_recv_am(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
static int last = 0;
ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
test_req_t *request;
size_t msg_length;
void *msg;
test_req_t ctx;
memset(iov, 0, iov_cnt * sizeof(*iov));
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, &params) != 0) {
return -1;
}
if (is_server) {
am_data_desc.recv_buf = iov;
/* waiting for AM callback has called */
while (last == am_data_desc.complete) {
ucp_worker_progress(ucp_worker);
}
last++;
if (am_data_desc.is_rndv) {
/* Rendezvous request has arrived, need to invoke receive operation
* to confirm data transfer from the sender to the "recv_message"
* buffer. */
params.cb.recv_am = am_recv_cb;
request = ucp_am_recv_data_nbx(ucp_worker,
am_data_desc.desc,
msg, msg_length,
&params);
} else {
/* Data has arrived eagerly and is ready for use, no need to
* initiate receive operation. */
request = NULL;
}
} else {
/* Client sends a message to the server using the AM API */
params.cb.send = (ucp_send_nbx_callback_t)send_cb;
request = ucp_am_send_nbx(ep, TEST_AM_ID, NULL, 0ul, msg,
msg_length, &params);
}
return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
static void usage()
{
fprintf(stderr, "Usage: ucp_client_server [parameters]\n");
fprintf(stderr, "UCP client-server example utility\n");
fprintf(stderr, "\nParameters are:\n");
fprintf(stderr, " -a Set IP address of the server "
"(required for client and should not be specified "
"for the server)\n");
fprintf(stderr, " -l Set IP address where server listens "
"(If not specified, server uses INADDR_ANY; "
"Irrelevant at client)\n");
fprintf(stderr, " -p Port number to listen/connect to (default = %d). "
"0 on the server side means select a random port and print it\n",
DEFAULT_PORT);
fprintf(stderr, " -c Communication type for the client and server. "
" Valid values are:\n"
" 'stream' : Stream API\n"
" 'tag' : Tag API\n"
" 'am' : AM API\n"
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
fprintf(stderr, " -i Number of iterations to run. Client and server must "
"have the same value. (default = %d).\n",
num_iterations);
fprintf(stderr, " -v Number of buffers in a single data "
"transfer function call. (default = %ld).\n",
iov_cnt);
print_common_help();
fprintf(stderr, "\n");
}
static int parse_cmd(int argc, char *const argv[], char **server_addr,
char **listen_addr, send_recv_type_t *send_recv_type)
{
int c = 0;
int port;
while ((c = getopt(argc, argv, "a:l:p:c:6i:s:v:m:h")) != -1) {
switch (c) {
case 'a':
*server_addr = optarg;
break;
case 'c':
if (!strcasecmp(optarg, "stream")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
} else if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_AM;
} else {
fprintf(stderr, "Wrong communication type %s. "
"Using %s as default\n", optarg, COMM_TYPE_DEFAULT);
*send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
}
break;
case 'l':
*listen_addr = optarg;
break;
case 'p':
port = atoi(optarg);
if ((port < 0) || (port > UINT16_MAX)) {
fprintf(stderr, "Wrong server port number %d\n", port);
return -1;
}
server_port = port;
break;
case '6':
ai_family = AF_INET6;
break;
case 'i':
num_iterations = atoi(optarg);
break;
case 's':
test_string_length = atol(optarg);
if (test_string_length < 0) {
fprintf(stderr, "Wrong string size %ld\n", test_string_length);
}
break;
case 'v':
iov_cnt = atol(optarg);
if (iov_cnt <= 0) {
fprintf(stderr, "Wrong iov count %ld\n", iov_cnt);
}
break;
case 'm':
test_mem_type = parse_mem_type(optarg);
if (test_mem_type == UCS_MEMORY_TYPE_LAST) {
}
break;
case 'h':
default:
usage();
return -1;
}
}
return 0;
}
static char* sockaddr_get_ip_str(const struct sockaddr_storage *sock_addr,
char *ip_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
inet_ntop(AF_INET, &addr_in.sin_addr, ip_str, max_size);
return ip_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
inet_ntop(AF_INET6, &addr_in6.sin6_addr, ip_str, max_size);
return ip_str;
default:
return "Invalid address family";
}
}
static char* sockaddr_get_port_str(const struct sockaddr_storage *sock_addr,
char *port_str, size_t max_size)
{
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
switch (sock_addr->ss_family) {
case AF_INET:
memcpy(&addr_in, sock_addr, sizeof(struct sockaddr_in));
snprintf(port_str, max_size, "%d", ntohs(addr_in.sin_port));
return port_str;
case AF_INET6:
memcpy(&addr_in6, sock_addr, sizeof(struct sockaddr_in6));
snprintf(port_str, max_size, "%d", ntohs(addr_in6.sin6_port));
return port_str;
default:
return "Invalid address family";
}
}
static int client_server_communication(ucp_worker_h worker, ucp_ep_h ep,
send_recv_type_t send_recv_type,
int is_server, int current_iter)
{
int ret;
switch (send_recv_type) {
case CLIENT_SERVER_SEND_RECV_STREAM:
/* Client-Server communication via Stream API */
ret = send_recv_stream(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_TAG:
/* Client-Server communication via Tag-Matching API */
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_AM:
/* Client-Server communication via AM API. */
ret = send_recv_am(worker, ep, is_server, current_iter);
break;
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
}
return ret;
}
static int init_worker(ucp_context_h ucp_context, ucp_worker_h *ucp_worker)
{
ucp_worker_params_t worker_params;
ucs_status_t status;
int ret = 0;
memset(&worker_params, 0, sizeof(worker_params));
status = ucp_worker_create(ucp_context, &worker_params, ucp_worker);
if (status != UCS_OK) {
fprintf(stderr, "failed to ucp_worker_create (%s)\n", ucs_status_string(status));
ret = -1;
}
return ret;
}
static void server_conn_handle_cb(ucp_conn_request_h conn_request, void *arg)
{
ucx_server_ctx_t *context = arg;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
ucs_status_t status;
status = ucp_conn_request_query(conn_request, &attr);
if (status == UCS_OK) {
printf("Server received a connection request from client at address %s:%s\n",
sockaddr_get_ip_str(&attr.client_address, ip_str, sizeof(ip_str)),
sockaddr_get_port_str(&attr.client_address, port_str, sizeof(port_str)));
} else if (status != UCS_ERR_UNSUPPORTED) {
fprintf(stderr, "failed to query the connection request (%s)\n",
ucs_status_string(status));
}
if (context->conn_request == NULL) {
context->conn_request = conn_request;
} else {
/* The server is already handling a connection request from a client,
* reject this new one */
printf("Rejecting a connection request. "
"Only one client at a time is supported.\n");
status = ucp_listener_reject(context->listener, conn_request);
if (status != UCS_OK) {
fprintf(stderr, "server failed to reject a connection request: (%s)\n",
ucs_status_string(status));
}
}
}
static ucs_status_t server_create_ep(ucp_worker_h data_worker,
ucp_conn_request_h conn_request,
ucp_ep_h *server_ep)
{
ucp_ep_params_t ep_params;
ucs_status_t status;
/* Server creates an ep to the client on the data worker.
* This is not the worker the listener was created on.
* The client side should have initiated the connection, leading
* to this ep's creation */
ep_params.conn_request = conn_request;
ep_params.err_handler.cb = err_cb;
ep_params.err_handler.arg = NULL;
status = ucp_ep_create(data_worker, &ep_params, server_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to create an endpoint on the server: (%s)\n",
ucs_status_string(status));
}
return status;
}
start_server(ucp_worker_h ucp_worker, ucx_server_ctx_t *context,
ucp_listener_h *listener_p, const char *address_str)
{
struct sockaddr_storage listen_addr;
ucs_status_t status;
char ip_str[IP_STRING_LEN];
char port_str[PORT_STRING_LEN];
set_sock_addr(address_str, &listen_addr);
params.field_mask = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR |
params.sockaddr.addr = (const struct sockaddr*)&listen_addr;
params.sockaddr.addrlen = sizeof(listen_addr);
params.conn_handler.cb = server_conn_handle_cb;
params.conn_handler.arg = context;
/* Create a listener on the server side to listen on the given address.*/
status = ucp_listener_create(ucp_worker, &params, listener_p);
if (status != UCS_OK) {
fprintf(stderr, "failed to listen (%s)\n", ucs_status_string(status));
goto out;
}
/* Query the created listener to get the port it is listening on. */
status = ucp_listener_query(*listener_p, &attr);
if (status != UCS_OK) {
fprintf(stderr, "failed to query the listener (%s)\n",
ucs_status_string(status));
ucp_listener_destroy(*listener_p);
goto out;
}
fprintf(stderr, "server is listening on IP %s port %s\n",
sockaddr_get_ip_str(&attr.sockaddr, ip_str, IP_STRING_LEN),
sockaddr_get_port_str(&attr.sockaddr, port_str, PORT_STRING_LEN));
printf("Waiting for connection...\n");
out:
return status;
}
ucs_status_t register_am_recv_callback(ucp_worker_h worker)
{
param.id = TEST_AM_ID;
param.cb = ucp_am_data_cb;
param.arg = worker; /* not used in our callback */
return ucp_worker_set_am_recv_handler(worker, &param);
}
static int client_server_do_work(ucp_worker_h ucp_worker, ucp_ep_h ep,
send_recv_type_t send_recv_type, int is_server)
{
int i, ret = 0;
ucs_status_t status;
connection_closed = 0;
for (i = 0; i < num_iterations; i++) {
ret = client_server_communication(ucp_worker, ep, send_recv_type,
is_server, i);
if (ret != 0) {
fprintf(stderr, "%s failed on iteration #%d\n",
(is_server ? "server": "client"), i + 1);
goto out;
}
}
/* Register recv callback on the client side to receive FIN message */
if (!is_server && (send_recv_type == CLIENT_SERVER_SEND_RECV_AM)) {
status = register_am_recv_callback(ucp_worker);
if (status != UCS_OK) {
ret = -1;
goto out;
}
}
/* FIN message in reverse direction to acknowledge delivery */
ret = client_server_communication(ucp_worker, ep, send_recv_type,
!is_server, i + 1);
if (ret != 0) {
fprintf(stderr, "%s failed on FIN message\n",
(is_server ? "server": "client"));
goto out;
}
printf("%s FIN message\n", is_server ? "sent" : "received");
/* Server waits until the client closed the connection after receiving FIN */
while (is_server && !connection_closed) {
ucp_worker_progress(ucp_worker);
}
out:
return ret;
}
static int run_server(ucp_context_h ucp_context, ucp_worker_h ucp_worker,
char *listen_addr, send_recv_type_t send_recv_type)
{
ucx_server_ctx_t context;
ucp_worker_h ucp_data_worker;
ucp_ep_h server_ep;
ucs_status_t status;
int ret;
/* Create a data worker (to be used for data exchange between the server
* and the client after the connection between them was established) */
ret = init_worker(ucp_context, &ucp_data_worker);
if (ret != 0) {
goto err;
}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) {
status = register_am_recv_callback(ucp_data_worker);
if (status != UCS_OK) {
ret = -1;
goto err_worker;
}
}
/* Initialize the server's context. */
context.conn_request = NULL;
/* Create a listener on the worker created at first. The 'connection
* worker' - used for connection establishment between client and server.
* This listener will stay open for listening to incoming connection
* requests from the client */
status = start_server(ucp_worker, &context, &context.listener, listen_addr);
if (status != UCS_OK) {
ret = -1;
goto err_worker;
}
/* Server is always up listening */
while (1) {
/* Wait for the server to receive a connection request from the client.
* If there are multiple clients for which the server's connection request
* callback is invoked, i.e. several clients are trying to connect in
* parallel, the server will handle only the first one and reject the rest */
while (context.conn_request == NULL) {
ucp_worker_progress(ucp_worker);
}
/* Server creates an ep to the client on the data worker.
* This is not the worker the listener was created on.
* The client side should have initiated the connection, leading
* to this ep's creation */
status = server_create_ep(ucp_data_worker, context.conn_request,
&server_ep);
if (status != UCS_OK) {
ret = -1;
goto err_listener;
}
/* The server waits for all the iterations to complete before moving on
* to the next client */
ret = client_server_do_work(ucp_data_worker, server_ep, send_recv_type,
1);
if (ret != 0) {
goto err_ep;
}
/* Close the endpoint to the client */
ep_close(ucp_data_worker, server_ep, UCP_EP_CLOSE_FLAG_FORCE);
/* Reinitialize the server's context to be used for the next client */
context.conn_request = NULL;
printf("Waiting for connection...\n");
}
err_ep:
ep_close(ucp_data_worker, server_ep, UCP_EP_CLOSE_FLAG_FORCE);
err_listener:
ucp_listener_destroy(context.listener);
err_worker:
ucp_worker_destroy(ucp_data_worker);
err:
return ret;
}
static int run_client(ucp_worker_h ucp_worker, char *server_addr,
send_recv_type_t send_recv_type)
{
ucp_ep_h client_ep;
ucs_status_t status;
int ret;
status = start_client(ucp_worker, server_addr, &client_ep);
if (status != UCS_OK) {
fprintf(stderr, "failed to start client (%s)\n", ucs_status_string(status));
ret = -1;
goto out;
}
ret = client_server_do_work(ucp_worker, client_ep, send_recv_type, 0);
/* Close the endpoint to the server */
ep_close(ucp_worker, client_ep, UCP_EP_CLOSE_FLAG_FORCE);
out:
return ret;
}
static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
send_recv_type_t send_recv_type)
{
/* UCP objects */
ucs_status_t status;
int ret = 0;
memset(&ucp_params, 0, sizeof(ucp_params));
/* UCP initialization */
ucp_params.name = "client_server";
if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) {
} else if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) {
} else {
}
status = ucp_init(&ucp_params, NULL, ucp_context);
if (status != UCS_OK) {
fprintf(stderr, "failed to ucp_init (%s)\n", ucs_status_string(status));
ret = -1;
goto err;
}
ret = init_worker(*ucp_context, ucp_worker);
if (ret != 0) {
goto err_cleanup;
}
return ret;
err_cleanup:
ucp_cleanup(*ucp_context);
err:
return ret;
}
int main(int argc, char **argv)
{
send_recv_type_t send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
char *server_addr = NULL;
char *listen_addr = NULL;
int ret;
/* UCP objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
ret = parse_cmd(argc, argv, &server_addr, &listen_addr, &send_recv_type);
if (ret != 0) {
goto err;
}
/* Initialize the UCX required objects */
ret = init_context(&ucp_context, &ucp_worker, send_recv_type);
if (ret != 0) {
goto err;
}
/* Client-Server initialization */
if (server_addr == NULL) {
/* Server side */
ret = run_server(ucp_context, ucp_worker, listen_addr, send_recv_type);
} else {
/* Client side */
ret = run_client(ucp_worker, server_addr, send_recv_type);
}
ucp_worker_destroy(ucp_worker);
ucp_cleanup(ucp_context);
err:
return ret;
}
ucp_err_handler_cb_t cb
Definition ucp_def.h:367
void * arg
Definition ucp_def.h:369
void ucp_request_free(void *request)
Release a communications request.
ucs_status_ptr_t ucp_stream_recv_nbx(ucp_ep_h ep, void *buffer, size_t count, size_t *length, const ucp_request_param_t *param)
Non-blocking stream receive operation of structured data into a user-supplied buffer.
ucs_status_ptr_t ucp_am_send_nbx(ucp_ep_h ep, unsigned id, const void *header, size_t header_length, const void *buffer, size_t count, const ucp_request_param_t *param)
Send Active Message.
ucs_status_ptr_t ucp_tag_send_nbx(ucp_ep_h ep, const void *buffer, size_t count, ucp_tag_t tag, const ucp_request_param_t *param)
Non-blocking tagged-send operation.
ucs_status_ptr_t ucp_tag_recv_nbx(ucp_worker_h worker, void *buffer, size_t count, ucp_tag_t tag, ucp_tag_t tag_mask, const ucp_request_param_t *param)
Non-blocking tagged-receive operation.
ucs_status_t ucp_request_check_status(void *request)
Check the status of non-blocking request.
ucs_status_ptr_t ucp_stream_send_nbx(ucp_ep_h ep, const void *buffer, size_t count, const ucp_request_param_t *param)
Non-blocking stream send operation.
void(* ucp_send_nbx_callback_t)(void *request, ucs_status_t status, void *user_data)
Completion callback for non-blocking sends.
Definition ucp_def.h:340
ucs_status_ptr_t ucp_am_recv_data_nbx(ucp_worker_h worker, void *data_desc, void *buffer, size_t count, const ucp_request_param_t *param)
Receive Active Message as defined by provided data descriptor.
@ UCP_AM_RECV_ATTR_FLAG_RNDV
Definition ucp.h:788
@ UCP_OP_ATTR_FIELD_FLAGS
Definition ucp.h:711
@ UCP_OP_ATTR_FIELD_DATATYPE
Definition ucp.h:710
@ UCP_OP_ATTR_FIELD_CALLBACK
Definition ucp.h:708
@ UCP_OP_ATTR_FLAG_NO_IMM_CMPL
Definition ucp.h:717
@ UCP_OP_ATTR_FIELD_USER_DATA
Definition ucp.h:709
@ UCP_STREAM_RECV_FLAG_WAITALL
Definition ucp.h:689
@ UCP_AM_HANDLER_PARAM_FIELD_ARG
Definition ucp.h:815
@ UCP_AM_HANDLER_PARAM_FIELD_ID
Definition ucp.h:803
@ UCP_AM_HANDLER_PARAM_FIELD_CB
Definition ucp.h:811
uint64_t field_mask
Definition ucp.h:1041
const char * name
Definition ucp.h:1136
uint64_t features
Definition ucp.h:1049
Tuning parameters for UCP library.
Definition ucp.h:1035
union ucp_request_param_t::@2 cb
ucp_datatype_t datatype
Definition ucp.h:1776
uint32_t flags
Definition ucp.h:1750
uint32_t op_attr_mask
Definition ucp.h:1747
void * user_data
Definition ucp.h:1781
struct ucp_context * ucp_context_h
UCP Application Context.
Definition ucp_def.h:60
static ucs_status_t ucp_init(const ucp_params_t *params, const ucp_config_t *config, ucp_context_h *context_p)
UCP context initialization.
Definition ucp.h:2084
void ucp_cleanup(ucp_context_h context_p)
Release UCP application context.
@ UCP_PARAM_FIELD_FEATURES
Definition ucp.h:120
@ UCP_PARAM_FIELD_NAME
Definition ucp.h:128
@ UCP_FEATURE_STREAM
Definition ucp.h:157
@ UCP_FEATURE_TAG
Definition ucp.h:142
@ UCP_FEATURE_AM
Definition ucp.h:160
Operation parameters passed to ucp_tag_send_nbx, ucp_tag_send_sync_nbx, ucp_tag_recv_nbx,...
Definition ucp.h:1741
UCP receive information descriptor.
Definition ucp.h:1693
void * buffer
Definition ucp.h:865
size_t length
Definition ucp.h:866
#define ucp_dt_make_contig(_elem_size)
Generate an identifier for contiguous data type.
Definition ucp.h:833
@ UCP_DATATYPE_IOV
Definition ucp.h:506
Structure for scatter-gather I/O.
Definition ucp.h:864
ucp_err_handling_mode_t err_mode
Definition ucp_def.h:704
unsigned flags
Definition ucp_def.h:724
ucp_conn_request_h conn_request
Definition ucp_def.h:743
ucs_sock_addr_t sockaddr
Definition ucp_def.h:733
ucp_err_handler_t err_handler
Definition ucp_def.h:709
uint64_t field_mask
Definition ucp_def.h:690
struct ucp_conn_request * ucp_conn_request_h
UCP connection request.
Definition ucp_def.h:102
ucs_status_t ucp_ep_create(ucp_worker_h worker, const ucp_ep_params_t *params, ucp_ep_h *ep_p)
Create and connect an endpoint.
struct ucp_ep * ucp_ep_h
UCP Endpoint.
Definition ucp_def.h:92
@ UCP_EP_PARAMS_FLAGS_CLIENT_SERVER
Definition ucp.h:279
@ UCP_ERR_HANDLING_MODE_PEER
Definition ucp_def.h:130
@ UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE
Definition ucp.h:257
@ UCP_EP_PARAM_FIELD_FLAGS
Definition ucp.h:263
@ UCP_EP_PARAM_FIELD_SOCK_ADDR
Definition ucp.h:262
@ UCP_EP_PARAM_FIELD_CONN_REQUEST
Definition ucp.h:265
@ UCP_EP_PARAM_FIELD_ERR_HANDLER
Definition ucp.h:259
@ UCP_EP_CLOSE_FLAG_FORCE
Definition ucp.h:314
Tuning parameters for the UCP endpoint.
Definition ucp_def.h:683
uint64_t field_mask
Definition ucp.h:1299
uint64_t recv_attr
Definition ucp.h:1910
ucs_thread_mode_t thread_mode
Definition ucp.h:1313
struct sockaddr_storage client_address
Definition ucp.h:1507
uint64_t field_mask
Definition ucp.h:1874
uint64_t field_mask
Definition ucp.h:1477
ucp_am_recv_callback_t cb
Definition ucp.h:1890
unsigned id
Definition ucp.h:1879
uint64_t field_mask
Definition ucp.h:1501
void * arg
Definition ucp.h:1896
struct sockaddr_storage sockaddr
Definition ucp.h:1483
ucs_status_t ucp_worker_create(ucp_context_h context, const ucp_worker_params_t *params, ucp_worker_h *worker_p)
Create a worker object.
ucs_status_t ucp_listener_reject(ucp_listener_h listener, ucp_conn_request_h conn_request)
Reject an incoming connection request.
unsigned ucp_worker_progress(ucp_worker_h worker)
Progress all communications on a specific worker.
ucs_status_t ucp_worker_set_am_recv_handler(ucp_worker_h worker, const ucp_am_handler_param_t *param)
Add user defined callback for Active Message.
ucs_status_t ucp_conn_request_query(ucp_conn_request_h conn_request, ucp_conn_request_attr_t *attr)
Get attributes specific to a particular connection request received on the server side.
struct ucp_worker * ucp_worker_h
UCP Worker.
Definition ucp_def.h:246
void ucp_worker_destroy(ucp_worker_h worker)
Destroy a worker object.
ucs_status_t ucp_listener_query(ucp_listener_h listener, ucp_listener_attr_t *attr)
Get attributes specific to a particular listener.
ucs_status_t ucp_listener_create(ucp_worker_h worker, const ucp_listener_params_t *params, ucp_listener_h *listener_p)
Create a listener to accept connections on. Connection requests on the listener will arrive at a loca...
struct ucp_listener * ucp_listener_h
UCP listen handle.
Definition ucp_def.h:177
void ucp_listener_destroy(ucp_listener_h listener)
Stop accepting connections on a local address of the worker object.
@ UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR
Definition ucp.h:492
@ UCP_LISTENER_PARAM_FIELD_SOCK_ADDR
Definition ucp.h:217
@ UCP_LISTENER_PARAM_FIELD_CONN_HANDLER
Definition ucp.h:226
@ UCP_LISTENER_ATTR_FIELD_SOCKADDR
Definition ucp.h:480
@ UCP_WORKER_PARAM_FIELD_THREAD_MODE
Definition ucp.h:179
Active Message handler parameters passed to ucp_worker_set_am_recv_handler routine.
Definition ucp.h:1868
Operation parameters provided in ucp_am_recv_callback_t callback.
Definition ucp.h:1904
UCP listener's connection request attributes.
Definition ucp.h:1494
UCP listener attributes.
Definition ucp.h:1470
Parameters for a UCP listener object.
Definition ucp.h:1524
Tuning parameters for the UCP worker.
Definition ucp.h:1293
const struct sockaddr * addr
Definition types.h:138
socklen_t addrlen
Definition types.h:139
ucs_status_t
Status codes.
Definition status.h:45
@ UCS_THREAD_MODE_SINGLE
Definition thread_mode.h:20
@ UCS_MEMORY_TYPE_LAST
Definition memory_type.h:48
@ UCS_OK
Definition status.h:47
@ UCS_ERR_UNSUPPORTED
Definition status.h:74
@ UCS_INPROGRESS
Definition status.h:50