OpenEdv-开源电子网

 找回密码
 立即注册
正点原子全套STM32/Linux/FPGA开发资料,上千讲STM32视频教程免费下载...
查看: 203|回复: 2

lwip 接收阻塞模式下,如何发送数据

[复制链接]

32

主题

55

帖子

0

精华

初级会员

Rank: 2

积分
148
金钱
148
注册时间
2020-5-21
在线时间
62 小时
发表于 2025-4-23 20:47:34 | 显示全部楼层 |阅读模式
1金钱
freertos 线程中:
   while(1) {
                       
                               

        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        bind(sockfd, (struct sockaddr *)&clt_addr, sizeof(clt_addr));
        connect(sockfd, (struct sockaddr *)&svr_addr, sizeof(svr_addr));

        while(-1 != sockfd) {
            recvnum = recv(sockfd, buf, MAX_BUF_SIZE, 0);
                       
        }

        lwip_close(sockfd);
        sockfd = -1;
    }

接收正常,发现recv函数是阻塞模式。现在需要在另一个线程中 不断 发送数据,
   while(1) {
        send(...);
         vTaskDelay(250);


      }

结果发现收发都不运行。是否改成非阻塞接收,如何改,查到有用lwip_setsockopt(sockfd, SOL_SOCKET, SO_NONBLOCK, &nonblock, sizeof(nonblock));
但是没有定义SO_NONBLOCK。请指教,谢谢!
                       


正点原子逻辑分析仪DL16劲爆上市
回复

使用道具 举报

4

主题

912

帖子

0

精华

论坛元老

Rank: 8Rank: 8

积分
4524
金钱
4524
注册时间
2019-9-4
在线时间
916 小时
发表于 2025-4-27 15:28:29 | 显示全部楼层
本帖最后由 A571157242 于 2025-4-27 15:39 编辑

您好!您遇到的问题很典型,主要是由 recv 函数的阻塞特性以及多线程共享资源(socket)的处理方式引起的。我们来详细分析一下:
问题核心:
  • recv 阻塞:在默认(阻塞)模式下,recv 函数会一直等待,直到接收到数据、连接被对方关闭(返回0)、或者发生错误(返回-1)。如果网络上长时间没有数据到来,调用 recv 的线程(我们称之为“接收线程”)就会一直卡在 recv 这一行,无法执行循环中的后续代码(比如检查是否需要退出、处理错误后的socket关闭等),也无法主动让出 CPU 给其他任务(除非 FreeRTOS 的抢占式调度切换到其他更高优先级的任务,或者当前任务的时间片用完)。
  • 发送线程的行为:当接收线程阻塞在 recv 时,发送线程理论上仍然可以运行(如果其优先级允许并且 CPU 可用)。但是,如果发送线程尝试使用的 sockfd 变得无效(例如,接收线程在 recv 返回错误后关闭了 socket,但在发送线程使用之前),或者 send 操作本身也因为某些原因阻塞(例如发送缓冲区满),那么发送线程也可能表现为“不运行”或卡住。如果两个线程因为共享 sockfd 的状态管理不当而互相影响,就可能导致收发都不工作。

解决方案:
您需要让接收线程在没有数据时也能继续执行或让出 CPU,而不是无限期阻塞。有几种常用方法:
方法一:使用 select 函数(推荐)
这是最常用、最标准的方法,用于检查一个或多个 socket 是否准备好进行读、写或是否有错误,可以设置一个超时时间。
#include "lwip/sockets.h"
#include "lwip/sys.h" // For sys_msleep or vTaskDelay if used directly
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"

// ... (你的 svr_addr, clt_addr, MAX_BUF_SIZE 定义) ...

// 假设 sockfd 在两个线程间共享,需要确保线程安全访问
// 简单示例:使用全局变量 (注意:实际应用中建议使用更安全的传递方式,如队列或事件组)
int shared_sockfd = -1;
// 可以使用互斥锁保护 shared_sockfd 的访问
// SemaphoreHandle_t sockfd_mutex; // 在初始化时创建

void receiving_thread(void *pvParameters) {
    struct sockaddr_in svr_addr, clt_addr;
    // ... 设置 svr_addr 和 clt_addr ...
    char buf[MAX_BUF_SIZE];
    int recvnum;
    int current_sockfd = -1; // Use a local copy within the loop

    // sockfd_mutex = xSemaphoreCreateMutex(); // 创建互斥锁

    while (1) {
        current_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // Use IPPROTO_TCP explicitly
        if (current_sockfd < 0) {
            printf("Failed to create socket\n");
            vTaskDelay(pdMS_TO_TICKS(1000)); // 等待后重试
            continue;
        }

        // 可选:设置 SO_REUSEADDR 允许快速重绑定
        int opt = 1;
        lwip_setsockopt(current_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

        // 绑定是可选的,通常客户端不需要绑定特定本地地址/端口
        // if (bind(current_sockfd, (struct sockaddr *)&clt_addr, sizeof(clt_addr)) < 0) {
        //     printf("Bind failed\n");
        //     lwip_close(current_sockfd);
        //     vTaskDelay(pdMS_TO_TICKS(1000));
        //     continue;
        // }

        if (connect(current_sockfd, (struct sockaddr *)&svr_addr, sizeof(svr_addr)) < 0) {
            printf("Connect failed\n");
            lwip_close(current_sockfd);
            vTaskDelay(pdMS_TO_TICKS(1000)); // 等待后重试
            continue;
        }

        printf("Connected. sockfd: %d\n", current_sockfd);

        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY); // 获取锁
        shared_sockfd = current_sockfd; // 安全地共享 sockfd
        // xSemaphoreGive(sockfd_mutex); // 释放锁

        fd_set readfds;
        struct timeval tv;
        int ret;

        while (shared_sockfd != -1) { // 当连接有效时循环
            FD_ZERO(&readfds);
            FD_SET(shared_sockfd, &readfds); // 监听当前 socket 的可读事件

            // 设置超时时间 (例如:1秒)
            tv.tv_sec = 1;
            tv.tv_usec = 0;

            // 等待 socket 可读或超时
            // 注意:select 的第一个参数是 nfds,应为最高描述符值 + 1
            ret = lwip_select(shared_sockfd + 1, &readfds, NULL, NULL, &tv);

            if (ret < 0) {
                // select 出错
                printf("select error\n");
                // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
                lwip_close(shared_sockfd);
                shared_sockfd = -1;
                // xSemaphoreGive(sockfd_mutex);
                break; // 跳出内层循环,尝试重连
            } else if (ret == 0) {
                // 超时,没有数据可读
                // printf("select timeout, no data\n");
                // 这里可以做其他事情,比如检查退出标志
                // 或者简单地继续循环等待
                taskYIELD(); // 或者 vTaskDelay(1); 让出 CPU
            } else {
                // Socket 可读 (FD_ISSET 理论上可以省略,因为我们只监听了一个)
                if (FD_ISSET(shared_sockfd, &readfds)) {
                    recvnum = recv(shared_sockfd, buf, MAX_BUF_SIZE - 1, 0);
                    if (recvnum < 0) {
                        // recv 出错 (连接可能已断开)
                        printf("recv error\n");
                        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
                        lwip_close(shared_sockfd);
                        shared_sockfd = -1;
                        // xSemaphoreGive(sockfd_mutex);
                        break; // 跳出内层循环,尝试重连
                    } else if (recvnum == 0) {
                        // 连接被对方关闭
                        printf("Connection closed by peer\n");
                        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
                        lwip_close(shared_sockfd);
                        shared_sockfd = -1;
                        // xSemaphoreGive(sockfd_mutex);
                        break; // 跳出内层循环,尝试重连
                    } else {
                        // 成功接收到数据
                        buf[recvnum] = '\0'; // 添加字符串结束符
                        printf("Received: %s\n", buf);
                        // 处理接收到的数据...
                    }
                }
            }
            // 如果需要,可以添加一个小的延时 vTaskDelay(pdMS_TO_TICKS(10));
        }

        // 如果是因为 break 跳出循环来到这里,表示连接已断开或出错
        printf("Connection lost or closed. Attempting reconnect...\n");
        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
        if (shared_sockfd != -1) { // 可能在 break 前已被置为 -1
           lwip_close(shared_sockfd);
           shared_sockfd = -1;
        }
        // xSemaphoreGive(sockfd_mutex);
        vTaskDelay(pdMS_TO_TICKS(2000)); // 等待一段时间后重连
    }
}
方法二:设置 Socket 接收超时 (SO_RCVTIMEO)
这种方法让 recv 函数在阻塞一段时间后(如果还没有收到数据)返回一个特定的错误码 (EWOULDBLOCK 或 EAGAIN),而不是无限期阻塞。
#include "lwip/sockets.h"
#include "errno.h" // For EWOULDBLOCK/EAGAIN
// ... other includes

void receiving_thread(void *pvParameters) {
    // ... (socket, bind, connect 代码同上) ...
    int current_sockfd = -1;

    while (1) {
        current_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        // ... (错误处理) ...
        // ... (connect) ...
        if (connect(...) < 0) {
            // ... (错误处理,关闭,延迟) ...
            continue;
        }

        printf("Connected. sockfd: %d\n", current_sockfd);

        // 设置接收超时 (例如:1000毫秒 = 1秒)
        struct timeval timeout;
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
        if (lwip_setsockopt(current_sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
            printf("Failed to set socket receive timeout\n");
            // 可以选择继续,或者关闭重连
        }

        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
        shared_sockfd = current_sockfd;
        // xSemaphoreGive(sockfd_mutex);

        while (shared_sockfd != -1) {
            recvnum = recv(shared_sockfd, buf, MAX_BUF_SIZE - 1, 0);

            if (recvnum < 0) {
                // 检查是否是超时错误
                if (errno == EWOULDBLOCK || errno == EAGAIN) {
                    // 超时,没有数据到达
                    // printf("recv timeout\n");
                    // 这里可以做其他事情或简单地继续循环
                    taskYIELD(); // 让出CPU
                } else {
                    // 其他错误 (例如连接断开)
                    printf("recv error: %d\n", errno);
                    // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
                    lwip_close(shared_sockfd);
                    shared_sockfd = -1;
                    // xSemaphoreGive(sockfd_mutex);
                    break; // 跳出内层循环,尝试重连
                }
            } else if (recvnum == 0) {
                // 连接被对方关闭
                printf("Connection closed by peer\n");
                // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
                lwip_close(shared_sockfd);
                shared_sockfd = -1;
                // xSemaphoreGive(sockfd_mutex);
                break; // 跳出内层循环,尝试重连
            } else {
                // 成功接收到数据
                buf[recvnum] = '\0';
                printf("Received: %s\n", buf);
                // 处理数据...
            }
        }
        // ... (重连逻辑同上) ...
    }
}

方法三:设置 Socket 为非阻塞模式 (fcntl 或 ioctl)
SO_NONBLOCK 不是 setsockopt 的标准选项。在 lwIP (和 POSIX) 中,通常使用 fcntl 或 ioctl 来设置非阻塞模式。
  • 使用 fcntl (需要 LWIP_SOCKET_FCNTL 启用):
    #include <fcntl.h> // 可能需要这个头文件,或在 lwip/sockets.h 中已有定义
    // ... 在 connect 成功之后 ...
    int flags = lwip_fcntl(current_sockfd, F_GETFL, 0);
    if (flags == -1) {
        printf("fcntl F_GETFL failed\n");
        // 处理错误
    } else {
        if (lwip_fcntl(current_sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
            printf("fcntl F_SETFL O_NONBLOCK failed\n");
            // 处理错误
        } else {
            printf("Socket set to non-blocking mode\n");
            // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
            shared_sockfd = current_sockfd;
            // xSemaphoreGive(sockfd_mutex);
        }
    }
  • 使用 ioctl (更常见于 lwIP):
    #include "lwip/sockets.h" // FIONBIO 通常在这里定义
    // ... 在 connect 成功之后 ...
    int nonblock = 1;
    if (lwip_ioctl(current_sockfd, FIONBIO, &nonblock) < 0) {
        printf("ioctl FIONBIO failed\n");
        // 处理错误
    } else {
         printf("Socket set to non-blocking mode using ioctl\n");
         // xSemaphoreTake(sockfd_mutex, portMAX_DELAY);
         shared_sockfd = current_sockfd;
         // xSemaphoreGive(sockfd_mutex);
    }
如果使用非阻塞模式,recv 的行为会改变:
  • 如果有数据,recv 立即返回,返回值是接收到的字节数。
  • 如果没有数据,recv 立即返回 -1,并且 errno 被设置为 EWOULDBLOCK 或 EAGAIN。
  • 如果连接关闭,recv 返回 0。
  • 如果发生其他错误,recv 返回 -1,errno 设置为相应的错误码。

非阻塞模式下的接收循环:
        // ... socket 设置为非阻塞模式后 ...
        while (shared_sockfd != -1) {
            recvnum = recv(shared_sockfd, buf, MAX_BUF_SIZE - 1, 0);

            if (recvnum < 0) {
                if (errno == EWOULDBLOCK || errno == EAGAIN) {
                    // 没有数据可读,让出 CPU 等待一会
                    vTaskDelay(pdMS_TO_TICKS(10)); // 或者更短/使用 taskYIELD()
                } else {
                    // 真错误
                    printf("recv error: %d\n", errno);
                    // ... 关闭 socket, break ...
                    break;
                }
            } else if (recvnum == 0) {
                // 连接关闭
                printf("Connection closed by peer\n");
                // ... 关闭 socket, break ...
                break;
            } else {
                // 收到数据
                buf[recvnum] = '\0';
                printf("Received: %s\n", buf);
                // 处理数据...
            }
        }
        // ... 重连逻辑 ...

关于发送线程:
void sending_thread(void *pvParameters) {
    char send_buf[] = "Hello from ESP32!";
    int bytes_sent;
    int current_sockfd_to_use; // 本地副本

    while (1) {
        // xSemaphoreTake(sockfd_mutex, portMAX_DELAY); // 获取锁
        current_sockfd_to_use = shared_sockfd; // 获取当前有效的 sockfd
        // xSemaphoreGive(sockfd_mutex); // 释放锁

        if (current_sockfd_to_use != -1) { // 检查 socket 是否有效
            bytes_sent = send(current_sockfd_to_use, send_buf, strlen(send_buf), 0);
            if (bytes_sent < 0) {
                // 发送失败,可能是连接已断开或其他错误
                printf("send error: %d\n", errno);
                // 这里可以考虑通知接收线程(如果它还不知道),或者等待接收线程重建连接
                // 注意:不要在这里关闭 socket,让接收线程负责管理连接生命周期
            } else if (bytes_sent < strlen(send_buf)) {
                 printf("Partial send, sent %d bytes\n", bytes_sent);
                 // 可能需要处理未发送完的数据(对于简单示例,可以忽略)
            } else {
                // printf("Sent: %s\n", send_buf);
            }
        } else {
             // printf("Sending thread: No active connection.\n");
             // Socket 无效,等待连接建立
        }

        vTaskDelay(pdMS_TO_TICKS(250)); // 按需发送
    }
}
关键点总结:
  • 避免无限阻塞:使用 select, SO_RCVTIMEO 或非阻塞模式之一来改造接收线程,使其在没有数据时也能继续运行或适时等待。select 通常是功能最全且推荐的方式。
  • 共享 Socket (sockfd)

    • 两个线程都需要访问同一个有效的 sockfd。
    • 确保 sockfd 在线程间的传递是安全的。使用全局变量时要特别小心,最好用 FreeRTOS 的队列、事件组或互斥锁来同步访问和状态更新。
    • 明确责任:通常让一个线程(比如接收线程)负责建立、维护和关闭连接。发送线程只在连接有效时尝试发送,并处理发送失败的情况(但不主动关闭连接)。

  • 错误处理:connect, recv, send 都可能失败。必须检查它们的返回值,并根据 errno 判断错误类型,进行恰当的处理(重试、关闭连接、记录日志等)。尤其是在发送线程中,send 失败往往意味着连接可能已经断了。
  • 关闭 Socket:确保 lwip_close(sockfd) 只被调用一次,并且在关闭后将共享的 sockfd 变量置为无效状态(如 -1),防止其他线程使用已关闭的描述符。使用互斥锁保护关闭操作和 sockfd 变量的更新。
  • lwIP 配置:确保你的 lwipopts.h 文件中启用了必要的选项(如 LWIP_SOCKET, LWIP_POSIX_SOCKETS_IO_NAMES, 可能需要 LWIP_SOCKET_SELECT, LWIP_SOCKET_FCNTL, LWIP_SO_RCVTIMEO 等,具体取决于你选择的方法)。

关于 SO_NONBLOCK 未定义:
这是正常的,因为设置非阻塞的标准接口不是通过 setsockopt 的 SO_NONBLOCK 选项。请使用上面提到的 fcntl 或 ioctl 方法。
建议:
从稳定性和效率角度看,推荐使用 select (方法一)。如果只是简单地想避免 recv 无限阻塞,SO_RCVTIMEO (方法二) 是个更简单的修改。非阻塞模式+轮询 (方法三) 也是可行的,但可能在 CPU 占用上不如 select 或 SO_RCVTIMEO 高效(取决于轮询间隔)。
修改您的代码时,请特别注意 sockfd 的共享和生命周期管理,以及对所有网络操作返回值的检查。希望能帮助到您!
回复

使用道具 举报

530

主题

11万

帖子

34

精华

管理员

Rank: 12Rank: 12Rank: 12

积分
165524
金钱
165524
注册时间
2010-12-1
在线时间
2116 小时
发表于 2025-4-27 20:53:55 | 显示全部楼层
A571157242 发表于 2025-4-27 15:28
您好!您遇到的问题很典型,主要是由 recv 函数的阻塞特性以及多线程共享资源(socket)的处理方式引起的。 ...

优秀啊
我是开源电子网www.openedv.com站长,有关站务问题请与我联系。
正点原子STM32开发板购买店铺http://openedv.taobao.com
正点原子官方微信公众平台,点击这里关注“正点原子”
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则



关闭

原子哥极力推荐上一条 /2 下一条

正点原子公众号

QQ|手机版|OpenEdv-开源电子网 ( 粤ICP备12000418号-1 )

GMT+8, 2025-6-8 04:22

Powered by OpenEdv-开源电子网

© 2001-2030 OpenEdv-开源电子网

快速回复 返回顶部 返回列表