搜索
bottom↓
回复: 31

由环形队列消锁引发的思考

[复制链接]

出0入0汤圆

发表于 2014-11-7 13:40:27 | 显示全部楼层 |阅读模式
看到一篇好文章  转过来大家看看
我想,很多的程序员都知道如何利用环形队列来避免使用锁。那就是,生产者只更改写指针,消费者只更改读指针。当然这里的指针一般可能是整数下标。
例如: read_pos为消费者更改的读指针,每读走一个元素 则 read_pos = (read_pos + 1) % SIZE,
            write_pos为生产者更改的写指针,每写完一个元素, write_pos = (write_pos + 1) % SIZE,
在消费者看来,只要队列不空,就可以读取当前读指针处的元素,在生产者看来,只要队列不满,就可以在当前的写位置上写一个元素。
队列为空的条件为: read_pos = write_pos(对吗?)
队列为满的条件为: (write_pos + 1) % SIZE = read_pos(对吗?)
       现在所有的东西都已经考虑完了?
       其实这种实现方案的最大的隐患在于如何能够正确的比较read_pos和write_pos之间的关系。例如,很多人都觉得在read_pos = read_pos + 1执行完之前,read_pos会始终保持原来的值不变。这种错误很多人都会犯。可是又有人会反驳道,我就是这么实现的,而且程序运行良好,从来没出过问题。下面我们就讲讲,在什么情况下,这种实现方法是正确的。这个我们必须得讨论一下计算机的数据总线宽度。数据总线宽度标识这CPU一次能够读取的数据的大小。常见的有16位,32位,64位。现在我们讨论一下read_pos与write_pos的类型,如果两个都是int型,即都是32位,那么如果你的计算机的数据总线宽度是32或64,那么很幸运你将得到一个"不易出错"的代码,因为这种情况下read_pos或write_pos在完成加1操作之前一定是原来的值。而如果你的计算机数据总线宽度为16的话,我想你不出错都很难,这时read_pos或write_pos在加1过程中很可能出现首先更改read_pos或write_pos的低16位,之后再更改它们的高16位。而恰恰如果我们在这两次更改之间进行了读,那么读到数肯定就会是错误的。那么后果不言自明,这会导致判断出现错误,因此,读写就会发生错误。同样的问题会出现在read_pos和write_pos为long long类型,而却应用在数据总线32位,16位机器上。
       我们再回头看看"不易出错",说他不易出错原因就是因为即使考虑了数据总线的宽度也是不全面的,因为我上面的代码就是错误的。大家如果能仔细分析其实就不难猜到,队列为空或满的判断条件是不对的,应该改为
队列为空:read_pos % SIZE = write_pos % SIZE
队列为满:(write_pos + 1) % SIZE = read_pos % SIZE
理由是,read_pos = (read_pos + 1) % SIZE,被编译器分解为 read_pos = read_pos + 1, read_pos = read_pos % SIZE两个操作,write_pos同理。那么如果我们在第二个操作完成之前第一个操作完成之后读取read_pos,那么就读到了错误的数,因为read_pos很可能等于SIZE。而我们却规定read_pos与write_pos在0-SIZE-1之间。
       当我写到这的时候,您认为就已经考虑的足够了吗?答案是没有。
       我们还需要把read_pos和write_pos声明为volatile,这个关键字的最主要用途就是防止编译器优化,并且每次强制从内存中读取值,而不是从寄存器。因为内存中的值才是最安全的。例如,如果计算机的加1操作被编译器弄成了先+2后-1这种形式去实现,而+2之后的值存在寄存器中,而程序偏偏就在这时读取了寄存器中的数据,那么程序就会出错。
       写到这,就接近尾声了。对于环形队列消锁的问题可能并不存在十全十美的解决方案,即使考虑了上面的那些因素,也不能说明程序是正确的,因为我们不了解编译器会怎么做,也不了解计算机会怎么做,我们这么做只是让我们的程序出错的概率变小而已。而可能在现在的计算机体系结构和操作系统以及编译器恰好是对的,但是如果哪一天计算机的体系结构变了,我们的系统可能会因上面的代码随时崩溃。

阿莫论坛20周年了!感谢大家的支持与爱护!!

一只鸟敢站在脆弱的枝条上歇脚,它依仗的不是枝条不会断,而是自己有翅膀,会飞。

出0入0汤圆

 楼主| 发表于 2014-11-7 15:21:20 | 显示全部楼层
没有代码  发帖好像不完整

出0入0汤圆

发表于 2014-11-7 15:24:35 | 显示全部楼层
看不太懂。

一般对于读写都要考虑是否需要进行原子性操作。

还有“read_pos = (read_pos + 1) % SIZE,被编译器分解为 read_pos = read_pos + 1, read_pos = read_pos % SIZE两个操作”,表示怀疑,编译器真会这么干?

出0入0汤圆

 楼主| 发表于 2014-11-7 15:26:05 | 显示全部楼层
不知道从什么时候开始,大家每做一件事情,都要从它的源头说起。
为了表示我也不例外,那么我也说说我今天发表这个帖子的来龙去脉和前因后果。
也许大家不怎么关心一个陌生人的故事,就好像这样的一句话。别把自己太当回事,你在别人眼里就是一坨屎!
这段时间,想写一个自己用的库函数。就想总结下写点模块化的代码。比如按键,LCD,IIC,GUI,菜单,软件架构方面。就有了下面的代码。
这个模块是环形缓冲区 主要是参考UCOS 陈明计的代码改写的。老是看大家的帖子不回多不好

//Message.h

#ifndef __MESSAGE_H
#define __MESSAGE_H


#define MESSAGEBUFSIZE          (64U)
#define MAXMESSAGEID            (63U)
#define CALCULATE_MESSAGE_SIZE  ((MessageQueue.write-MessageQueue.read)&(MAXMESSAGEID))

struct sMessageCircularBuf  
{
        volatile u32  write;
        volatile u32  read;
        volatile u32  messagebuf[MESSAGEBUFSIZE];
        void (*init)(void);
        u32  (*post)(u32 message);
        u32  (*pend)(u32 *mesage);
};

extern struct sMessageCircularBuf MessageQueue;


#endif




//Message.c


/************************************TX-STUDIO*********************************
* 文件名             : Message.c
* 作者               : shinemotou
* 版本               : V0.01
* 日期               : 2014/5/28
* 描述               : Message program body
* 功能               : Message        这里相当于对一个数组操作 隐藏了直接对数组操作 这样可以更好的操作数据                                                                       
*******************************************************************************/
#include "stm32f10x.h"
#include "Dri_driver.h"
#include "Message.h"

static u32 PostMessageQueue(u32 message);
static u32 PendMessageQueue(u32 *mesage);
static void InitializeMessageQueue(void);

struct sMessageCircularBuf MessageQueue={0,0,{0},InitializeMessageQueue,PostMessageQueue,PendMessageQueue};


/*******************************************************************************
* 函数名                   : PostMessageQueue
* 描述                 : 无
* 输入           : 无
* 输出           : 无
* 返回           : 无
* 举例           :无
* 注意           :无
*******************************************************************************/
static u32 PostMessageQueue(u32 message)
{
        ENTER_CRITICAL();
        if(CALCULATE_MESSAGE_SIZE==(MAXMESSAGEID))
        {
                EXIT_CRITICAL();
                #ifdef DEBUG
                Debug("\r\n Mes_full \r\n");
                while(1);
                #endif
                return 0;
        }
        MessageQueue.messagebuf[MessageQueue.write]=message;
        MessageQueue.write=(MessageQueue.write+1)&(MAXMESSAGEID);
        EXIT_CRITICAL();
        return 1;          
}
/*******************************************************************************
* 函数名                   : PendMessageQueue
* 描述                 : 无
* 输入           : 无
* 输出           : 无
* 返回           : 无
* 举例           :无
* 注意           :无
*******************************************************************************/
static u32 PendMessageQueue(u32 *mesage)
{   
        ENTER_CRITICAL();
        if(CALCULATE_MESSAGE_SIZE==0)
    {
                EXIT_CRITICAL();
                #ifdef DEBUG
                Debug("\r\n Mes_empty \r\n");
                #endif
                return 0;
        }
        *mesage=MessageQueue.messagebuf[MessageQueue.read];
        MessageQueue.read=(MessageQueue.read+1)&(MAXMESSAGEID);
        EXIT_CRITICAL();
        return 1;
}
/*******************************************************************************
* 函数名                   : InitializeMessageQueue
* 描述                 : 无
* 输入           : 无
* 输出           : 无
* 返回           : 无
* 举例           :无
* 注意           :以前都是数据和函数分离  这里相当于数据里面包含了操作数据的函数  对外只见数据
*******************************************************************************/
static void InitializeMessageQueue(void)
{
        MessageQueue.read=0;
        MessageQueue.write=0;
        MessageQueue.init=InitializeMessageQueue;
        MessageQueue.post=PostMessageQueue;
        MessageQueue.pend=PendMessageQueue;
       
}

出0入0汤圆

发表于 2014-11-8 00:22:47 来自手机 | 显示全部楼层
我还没见过可以定义成一个连续数组的范围大于总线宽度。

出0入0汤圆

发表于 2014-11-8 00:34:23 | 显示全部楼层
他说的是8bit的芯片,而数组是16bit或者32bit的数,是这样吗?

出0入0汤圆

发表于 2014-11-8 08:06:37 来自手机 | 显示全部楼层
本帖最后由 wxty 于 2014-11-8 08:16 编辑

操作好复杂,楼主举个易懂的例子。

出0入0汤圆

发表于 2014-11-8 08:13:54 | 显示全部楼层
楼主的意思是不是说,如果用8位/16位MCU,而程序中要实现的一个环型队列读写索引却为32的数据类型是吗。

出0入0汤圆

发表于 2014-11-8 08:20:03 来自手机 | 显示全部楼层
LZ说的是总线宽度若是8位,但fifo计数器若是16位,那么就要分两次读写,读写了一半时任务切换,别的任务更改了这个值,那么回到原来的任务后就会出错。

出0入0汤圆

发表于 2014-11-8 09:41:27 | 显示全部楼层
DevLabs 发表于 2014-11-8 08:20
LZ说的是总线宽度若是8位,但fifo计数器若是16位,那么就要分两次读写,读写了一半时任务切换,别的任务更 ...

你说的对,关键就是读写指针两个计数器,一定要定义成和总线宽度一样的无符号类型,就没有问题了

lz说的就是原子读写的问题在环形缓冲区里的问题

出0入0汤圆

发表于 2014-11-8 10:06:56 | 显示全部楼层
理由是,read_pos = (read_pos + 1) % SIZE,被编译器分解为 read_pos = read_pos + 1, read_pos = read_pos % SIZE两个操作,write_pos同理。那么如果我们在第二个操作完成之前第一个操作完成之后读取read_pos,那么就读到了错误的数,因为read_pos很可能等于SIZE。而我们却规定read_pos与write_pos在0-SIZE-1之间。


这段话有异议,read_pos = read_pos + 1, read_pos = read_pos % SIZE这种拆法不对,应该是tmp= read_pos + 1, read_pos = tmp% SIZE,所以问题不在这里

出0入0汤圆

 楼主| 发表于 2014-11-8 11:52:32 | 显示全部楼层
这篇文章是别处转来的   有什么问题大家共同讨论
文章主要说了  原子操作  volatile   队列满和空的判断
这篇文章一看  问题确实很多  是不是真的问题  那要认真看  然后理解了
下面的示范代码  已经是我能写到的最简单的了   效率不敢说

出0入0汤圆

 楼主| 发表于 2014-11-8 11:55:51 | 显示全部楼层
read_pos = read_pos + 1    是不是原子操作的问题
这段话有异议,read_pos = read_pos + 1, read_pos = read_pos % SIZE这种拆法不对,应该是tmp= read_pos + 1, read_pos = tmp% SIZE,所以问题不在这里
这个我也不清楚  望高手解答

出10入46汤圆

发表于 2014-11-10 09:24:50 | 显示全部楼层
实际上,就是线程的原子操作的问题。

出0入0汤圆

发表于 2014-11-10 10:39:18 | 显示全部楼层
这个有一定单片机开发经验的都该知道的,数据类型的原子访问问题。用的是几位的数据总线等条件,来决定原子访问所需的数据类型。

出0入0汤圆

 楼主| 发表于 2014-11-10 10:43:48 | 显示全部楼层
twitter 发表于 2014-11-10 10:39
这个有一定单片机开发经验的都该知道的,数据类型的原子访问问题。用的是几位的数据总线等条件,来决定原子 ...

你是说到重点上了   希望你能给大家讲详细点好吗   

出0入0汤圆

发表于 2014-11-10 11:17:40 | 显示全部楼层
本帖最后由 twitter 于 2014-11-10 11:18 编辑
shinemotou 发表于 2014-11-10 10:43
你是说到重点上了   希望你能给大家讲详细点好吗


基本上的原理就是你楼主位的内容,主要就是针对一个变量的读写,对应的汇编有多条(常见的就是8位机访问16位、32位变量类型的,结构体直接复制代码),就要注意了。应该说是针对整个数据块的一致性有要求的地方。

出0入296汤圆

发表于 2014-11-13 00:30:09 | 显示全部楼层
本帖最后由 Gorgon_Meducer 于 2014-11-13 00:32 编辑

严格来说这叫做数据访问的完整性问题。发生在多任务环境下。
解决方法有两种:1.做原子性保护 2.构建临界区
前者专指关中断,关任务调度之类的,好处是省事,缺点是用多了影响
系统实时性。后者是有针对性的用信号量构建临界区。好处是对实时性影响小
缺点是要借助信号量(会者不难)

感觉楼主思考的很多,不错。推荐加强下基础理论学习。看看操作系统的
基本知识,不必会写操作系统的,但要知道多任务同步协调的基本概念和
方法。避免走弯路

出0入0汤圆

发表于 2014-11-13 08:28:40 | 显示全部楼层
shinemotou 发表于 2014-11-10 10:43
你是说到重点上了   希望你能给大家讲详细点好吗


你自己都写了原理了,为啥还要别人讲?

不都说了吗? 因为数据线宽的不一样,有的片子可能处理一个数据类型要好几条汇编,但OS会在你处理这几条汇编的中间突进来进行任务切换,导致对应的地址内的值有变化,如果恰好此时是高任务在使用该值,不就产生误会了;

最简单的方法, 直接在函数内开头结尾加上原子操作,其实就是开关中断,不就妥妥的了;  更高效一些的话,只在你需要保护的地方加,其它地方不要加了;

出0入0汤圆

发表于 2014-11-13 11:51:12 | 显示全部楼层
/**********************************************
存入一个GPS数据到串口接收队列(FIFO)中
**********************************************/
void QueuePush(unsigned char _data)
{
    unsigned int pos;
        pos = (que._head+1) %QUEUE_MAX_SIZE;
       
    if(pos != que._tail)                                //非满状态
    {
        que._data[que._head] = _data;        //数据压栈
        que._head = pos;                                //移动栈头
                GPS_DATAFULL_N;
    }else GPS_DATAFULL_Y;//UART0_Sending_string("Full\r\n");/////直接发出;//串口接收队列已满
}
/******************************************************************
获取串口接收队列(FIFO)中一个完整的数据帧
24 xx xx xx .... 2A xx xx 0D 0A
********************************************************************/
unsigned char QueueFindData(unsigned char *GPSQueue)
{       
        unsigned int Data_tail;        //影子栈尾(数据帧头)
        unsigned char Data_state;

        const unsigned char Data[]={0x0D,0x0A};//帧尾格式为回车换行
       
        if(que._head == que._tail)        //是空栈
        {
                return 2;                                // 空栈       
        }
               
        if(que._data[que._tail] != 0x24)        //栈尾不是数据帧的头部        
        {
                FindFrameHead();        //搜寻下一个数据帧头部
                return 3;                        //数据帧头部错误       
        }
       
        Data_tail = que._tail;        //映射栈尾
        Data_state = 0;                       
           while(que._head != Data_tail && Data_state != 2)        //查找帧尾 从栈尾查向栈头
           {       
                Data_tail = (Data_tail+1) %QUEUE_MAX_SIZE;
               
                if(que._data[Data_tail] == Data[Data_state])        //查找帧尾
                        Data_state++;
                        else Data_state = 0;
        }

        if(Data_state == 2)                //找到数据帧 数据在 que._data[que._tail]与que_data[Data_tail]之间
        {
                unsigned char i = 0;
                while(que._tail != (Data_tail+1)%QUEUE_MAX_SIZE)
                {
                        GPSQueue[i] = que._data[que._tail];
                        i++;
                        que._tail = (que._tail+1)%QUEUE_MAX_SIZE;       
                }
                GPSQueue[i] = 0;

                return 0;                       
        }
        return 1;        //没有接收完成
}

出0入0汤圆

 楼主| 发表于 2014-11-13 17:42:25 | 显示全部楼层
Gorgon_Meducer 发表于 2014-11-13 00:30
严格来说这叫做数据访问的完整性问题。发生在多任务环境下。
解决方法有两种:1.做原子性保护 2.构建临界区 ...

一语点醒梦中人 知道该怎么更好的理解了

出0入0汤圆

发表于 2014-11-13 23:14:30 来自手机 | 显示全部楼层
在本版块,一些关键的贴子,领悟 傻孩子的话 找一些书来看,会懂的更深。  另外在判断到达队列最大时,不建议用%号,还是比较法好。  这方面建议ucos2,北航的书,虽然比较老,但理解多任务切换产生的原子性保护, 还有信号量,队列,锁都有讲

出0入0汤圆

 楼主| 发表于 2014-11-14 10:56:10 | 显示全部楼层
slzm40 发表于 2014-11-13 23:14
在本版块,一些关键的贴子,领悟 傻孩子的话 找一些书来看,会懂的更深。  另外在判断到达队列最大时,不建 ...

谢谢指点   能推荐几本具体的书吗

出0入0汤圆

发表于 2014-11-14 11:04:05 | 显示全部楼层
不错,慢慢看

出0入0汤圆

发表于 2014-11-14 14:34:35 | 显示全部楼层
shinemotou 发表于 2014-11-14 10:56
谢谢指点   能推荐几本具体的书吗


我也是在此学习,刚从硬件转软件,很一般,但准备转软件也自学了一年多,加上大学到现在一直不断的学习C,我就说说最近一个月在看的几本书,类别繁多,但主要为C
时间触发嵌入式系统设计模式(使用8051系统微控制器开发可靠应用)
UML+OOPC嵌入式C语言开发精讲
UCOS-ii,具体书名忘了,北航的
数据结构与算法分析,C语言描述
C++ Primer Plus(这个主要学面向对象的)
本版块的几个贴子: 通用按键模块程序, 精简通用FIFO,状态机实践入门-只要三把斧,C语言面向对象编程,等等都有很深的知识
还在学习中,
另外源码我有在参考用zstack,ti的CC1101的,UCOS- ii的,还有一些傻孩子的宏库。

个人学习,仅供参考

出0入0汤圆

发表于 2015-1-24 21:48:13 | 显示全部楼层
linux 内核里面有个无锁FIFO ,大家可以看一下。非常好。

出0入0汤圆

 楼主| 发表于 2015-1-28 19:58:26 | 显示全部楼层
chendong316 发表于 2015-1-24 21:48
linux 内核里面有个无锁FIFO ,大家可以看一下。非常好。

希望你能粘过来   linux不熟悉 谢谢

出0入0汤圆

发表于 2015-1-29 20:09:58 | 显示全部楼层
shinemotou 发表于 2015-1-28 19:58
希望你能粘过来   linux不熟悉 谢谢

清单 9. 2.6.10 环形缓冲区实现代码
/*
* __kfifo_put - puts some data into the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
       unsigned char *buffer, unsigned int len)
{
  unsigned int l;
  len = min(len, fifo->size - fifo->in + fifo->out);
  /* first put the data starting from fifo->in to buffer end */
  l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
  memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
  /* then put the rest (if any) at the beginning of the buffer */
  memcpy(fifo->buffer, buffer + l, len - l);
  fifo->in += len;
  return len;
}

/*
* __kfifo_get - gets some data from the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
     unsigned char *buffer, unsigned int len)
{
  unsigned int l;
  len = min(len, fifo->in - fifo->out);
  /* first get the data from fifo->out until the end of the buffer */
  l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
  memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
  /* then get the rest (if any) from the beginning of the buffer */
  memcpy(buffer + l, fifo->buffer, len - l);
  fifo->out += len;
  return len;
}
以上代码摘自 2.6.10 内核,通过代码的注释(斜体部分)可以看出,当只有一个消费者和一个生产者时,可以不用添加任何额外的锁,就能达到对共享数据的访问。

出0入0汤圆

发表于 2015-1-29 22:39:50 | 显示全部楼层
谢谢分享。。。。

出0入0汤圆

 楼主| 发表于 2015-2-3 11:32:38 | 显示全部楼层
chendong316 发表于 2015-1-29 20:09
清单 9. 2.6.10 环形缓冲区实现代码
/*
* __kfifo_put - puts some data into the FIFO, no locking v ...

谢谢  !

出0入0汤圆

发表于 2015-8-9 22:56:23 | 显示全部楼层
学习了,mark一下  环形队列消锁

出0入0汤圆

发表于 2015-10-29 19:26:17 | 显示全部楼层
学习,话说一个%运算在8位mcu里花几百个指令周期太难忍受了。
回帖提示: 反政府言论将被立即封锁ID 在按“提交”前,请自问一下:我这样表达会给举报吗,会给自己惹麻烦吗? 另外:尽量不要使用Mark、顶等没有意义的回复。不得大量使用大字体和彩色字。【本论坛不允许直接上传手机拍摄图片,浪费大家下载带宽和论坛服务器空间,请压缩后(图片小于1兆)才上传。压缩方法可以在微信里面发给自己(不要勾选“原图),然后下载,就能得到压缩后的图片】。另外,手机版只能上传图片,要上传附件需要切换到电脑版(不需要使用电脑,手机上切换到电脑版就行,页面底部)。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

手机版|Archiver|amobbs.com 阿莫电子技术论坛 ( 粤ICP备2022115958号, 版权所有:东莞阿莫电子贸易商行 创办于2004年 (公安交互式论坛备案:44190002001997 ) )

GMT+8, 2024-6-18 11:18

© Since 2004 www.amobbs.com, 原www.ourdev.cn, 原www.ouravr.com

快速回复 返回顶部 返回列表