@@ -13,6 +13,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/file.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -32,6 +33,7 @@
#include "eal_internal_cfg.h"
static int mp_fd = -1;
+static int lock_fd = -1;
static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */
static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
@@ -104,6 +106,46 @@ find_sync_request(const char *dst, const char *act_name)
return r;
}
+static void
+create_socket_path(const char *name, char *buf, int len)
+{
+ const char *prefix = eal_mp_socket_path();
+ if (strlen(name) > 0)
+ snprintf(buf, len, "%s_%s", prefix, name);
+ else
+ snprintf(buf, len, "%s", prefix);
+}
+
+static void
+create_lockfile_path(const char *name, char *buf, int len)
+{
+ const char *prefix = eal_mp_socket_path();
+ if (strlen(name) > 1)
+ snprintf(buf, len, "%slock_%s", prefix, name);
+ else
+ snprintf(buf, len, "%slock", prefix);
+}
+
+static const char *
+get_peer_name(const char *socket_full_path)
+{
+ char buf[PATH_MAX] = {0};
+ int len;
+
+ /* primary process has no peer name */
+ if (strcmp(socket_full_path, eal_mp_socket_path()) == 0)
+ return NULL;
+
+ /* construct dummy socket file name - make it one character long so that
+ * we hit the code path where underscores are added
+ */
+ create_socket_path("a", buf, sizeof(buf));
+
+ /* we want to get everything after /path/.rte_unix_, so discard 'a' */
+ len = strlen(buf) - 1;
+ return &socket_full_path[len];
+}
+
int
rte_eal_primary_proc_alive(const char *config_file_path)
{
@@ -330,8 +372,29 @@ mp_handle(void *arg __rte_unused)
static int
open_socket_fd(void)
{
+ char peer_name[PATH_MAX] = {0};
+ char lockfile[PATH_MAX] = {0};
struct sockaddr_un un;
- const char *prefix = eal_mp_socket_path();
+
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY)
+ snprintf(peer_name, sizeof(peer_name), "%d_%"PRIx64,
+ getpid(), rte_rdtsc());
+
+ /* try to create lockfile */
+ create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
+
+ lock_fd = open(lockfile, O_CREAT | O_RDWR);
+ if (lock_fd < 0) {
+ RTE_LOG(ERR, EAL, "failed to open '%s': %s\n", lockfile,
+ strerror(errno));
+ return -1;
+ }
+ if (flock(lock_fd, LOCK_EX | LOCK_NB)) {
+ RTE_LOG(ERR, EAL, "failed to lock '%s': %s\n", lockfile,
+ strerror(errno));
+ return -1;
+ }
+ /* no need to downgrade to shared lock */
mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (mp_fd < 0) {
@@ -341,13 +404,11 @@ open_socket_fd(void)
memset(&un, 0, sizeof(un));
un.sun_family = AF_UNIX;
- if (rte_eal_process_type() == RTE_PROC_PRIMARY)
- snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);
- else {
- snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64,
- prefix, getpid(), rte_rdtsc());
- }
+
+ create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
+
unlink(un.sun_path); /* May still exist since last run */
+
if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
un.sun_path, strerror(errno));
@@ -359,6 +420,44 @@ open_socket_fd(void)
return mp_fd;
}
+/* find corresponding lock file and try to lock it */
+static int
+socket_is_active(const char *peer_name)
+{
+ char lockfile[PATH_MAX] = {0};
+ int fd, ret = -1;
+
+ /* construct lockfile filename */
+ create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
+
+ /* try to lock it */
+ fd = open(lockfile, O_CREAT | O_RDWR);
+ if (fd < 0) {
+ RTE_LOG(ERR, EAL, "Cannot open '%s': %s\n", lockfile,
+ strerror(errno));
+ return -1;
+ }
+ ret = flock(fd, LOCK_EX | LOCK_NB);
+ if (ret < 0) {
+ if (errno == EWOULDBLOCK) {
+ /* file is locked */
+ ret = 1;
+ } else {
+ RTE_LOG(ERR, EAL, "Cannot lock '%s': %s\n", lockfile,
+ strerror(errno));
+ ret = -1;
+ }
+ } else {
+ ret = 0;
+ /* unlink lockfile automatically */
+ unlink(lockfile);
+ flock(fd, LOCK_UN);
+ }
+ close(fd);
+
+ return ret;
+}
+
static int
unlink_sockets(const char *filter)
{
@@ -374,28 +473,33 @@ unlink_sockets(const char *filter)
dir_fd = dirfd(mp_dir);
while ((ent = readdir(mp_dir))) {
- if (fnmatch(filter, ent->d_name, 0) == 0)
+ if (fnmatch(filter, ent->d_name, 0) == 0) {
+ const char *peer_name;
+ char path[PATH_MAX];
+ int ret;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+ peer_name = get_peer_name(path);
+
+ ret = socket_is_active(peer_name);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Error getting socket active status\n");
+ return -1;
+ } else if (ret == 1) {
+ RTE_LOG(ERR, EAL, "Socket is active (old secondary process still running?)\n");
+ return -1;
+ }
+ RTE_LOG(DEBUG, EAL, "Removing stale socket file '%s'\n",
+ ent->d_name);
unlinkat(dir_fd, ent->d_name, 0);
+ }
}
closedir(mp_dir);
return 0;
}
-static void
-unlink_socket_by_path(const char *path)
-{
- char *filename;
- char *fullpath = strdup(path);
-
- if (!fullpath)
- return;
- filename = basename(fullpath);
- unlink_sockets(filename);
- free(fullpath);
- RTE_LOG(INFO, EAL, "Remove socket %s\n", path);
-}
-
int
rte_mp_channel_init(void)
{
@@ -485,10 +589,25 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
rte_errno = errno;
/* Check if it caused by peer process exits */
if (errno == ECONNREFUSED) {
- /* We don't unlink the primary's socket here */
- if (rte_eal_process_type() == RTE_PROC_PRIMARY)
- unlink_socket_by_path(dst_path);
- return 0;
+ const char *peer_name = get_peer_name(dst_path);
+ int active, ret = 0;
+
+ active = rte_eal_process_type() == RTE_PROC_PRIMARY ?
+ socket_is_active(peer_name) :
+ rte_eal_primary_proc_alive(NULL);
+
+ if (active > 0) {
+ RTE_LOG(ERR, EAL, "Couldn't communicate with active peer\n");
+ } else if (active < 0) {
+ RTE_LOG(ERR, EAL, "Couldn't get peer status\n");
+ ret = -1;
+ } else if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ /* peer isn't active anymore, so unlink its
+ * socket.
+ */
+ unlink(dst_path);
+ }
+ return ret;
}
if (errno == ENOBUFS) {
RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
@@ -506,7 +625,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
static int
mp_send(struct rte_mp_msg *msg, const char *peer, int type)
{
- int ret = 0;
+ int dir_fd, ret = 0;
DIR *mp_dir;
struct dirent *ent;
@@ -528,15 +647,28 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
rte_errno = errno;
return -1;
}
+ dir_fd = dirfd(mp_dir);
while ((ent = readdir(mp_dir))) {
char path[PATH_MAX];
+ const char *peer_name;
+ int active;
if (fnmatch(mp_filter, ent->d_name, 0) != 0)
continue;
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);
- if (send_msg(path, msg, type) < 0)
+ peer_name = get_peer_name(path);
+
+ /* only send if we can expect to receive a reply, otherwise
+ * remove the socket.
+ */
+ active = socket_is_active(peer_name);
+ if (active < 0)
+ ret = -1;
+ else if (active == 0)
+ unlinkat(dir_fd, ent->d_name, 0);
+ else if (active > 0 && send_msg(path, msg, type) < 0)
ret = -1;
}
@@ -661,7 +793,7 @@ int __rte_experimental
rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
const struct timespec *ts)
{
- int ret = 0;
+ int dir_fd, ret = 0;
DIR *mp_dir;
struct dirent *ent;
struct timeval now;
@@ -696,15 +828,29 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
rte_errno = errno;
return -1;
}
+ dir_fd = dirfd(mp_dir);
while ((ent = readdir(mp_dir))) {
+ const char *peer_name;
char path[PATH_MAX];
+ int active;
if (fnmatch(mp_filter, ent->d_name, 0) != 0)
continue;
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);
+ peer_name = get_peer_name(path);
+
+ active = socket_is_active(peer_name);
+
+ if (active < 0) {
+ ret = -1;
+ break;
+ } else if (active == 0) {
+ unlinkat(dir_fd, ent->d_name, 0);
+ continue;
+ }
if (mp_request_one(path, req, reply, &end))
ret = -1;