作者利用了最布满的

目录

  • 一、前言
  • 二、ConcurrentBag类
  • 三、
    ConcurrentBag线程安全实现原理

    • 1.
      ConcurrentBag的个人字段
    • 2.
      用来数据存款和储蓄的TrehadLocalList类
    • 3.
      ConcurrentBag贯彻新扩充成分
    • 4. ConcurrentBag
      如何实现迭代器方式
  • 四、总结
  • 笔者水平有限,假若不当款待各位谈论指正!


一、前言

小编日前在做三个类别,项目中为了升高吞吐量,使用了新闻队列,中间实现了生产花费形式,在生种草费者格局中要求有几个成团,来累积生产者所生育的货品,小编利用了最广大的List<T>集聚类型。

出于生产者线程有很七个,花费者线程也可能有很多个,所以不可防止的就生出了线程同步的难点。发轫作者是行使lock关键字,进行线程同步,然则品质实际不是特地理想,然后有网民说能够动用SynchronizedList<T>来代表使用List<T>高达到规定的分数线程安全的指标。于是小编就替换到了SynchronizedList<T>,然则发掘性能如故不佳,于是查看了SynchronizedList<T>的源代码,开采它正是简约的在List<T>提供的API的根底上加了lock,所以质量基本与作者达成方式八九不离十。

末尾笔者找到了缓慢解决的方案,使用ConcurrentBag<T>类来实现,质量有十分大的退换,于是作者查阅了ConcurrentBag<T>的源代码,达成丰盛Mini,特此在此记录一下。

二、ConcurrentBag类

ConcurrentBag<T>实现了IProducerConsumerCollection<T>接口,该接口首要用来生产者成本者情势下,可知该类基本正是为生育花费者形式定制的。然后还贯彻了常常的IReadOnlyCollection<T>类,达成了此类就须要达成IEnumerable<T>、IEnumerable、 ICollection类。

ConcurrentBag<T>对外提供的办法未有List<T>那么多,可是同样有Enumerable达成的扩充方法。类本人提供的艺术如下所示。

名称 说明
Add 将对象添加到 ConcurrentBag 中。
CopyTo 从指定数组索引开始,将 ConcurrentBag 元素复制到现有的一维 Array 中。
Equals(Object) 确定指定的 Object 是否等于当前的 Object。 (继承自 Object。)
Finalize 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 Object。)
GetEnumerator 返回循环访问 ConcurrentBag 的枚举器。
GetHashCode 用作特定类型的哈希函数。 (继承自 Object。)
GetType 获取当前实例的 Type。 (继承自 Object。)
MemberwiseClone 创建当前 Object 的浅表副本。 (继承自 Object。)
ToArray 将 ConcurrentBag 元素复制到新数组。
ToString 返回表示当前对象的字符串。 (继承自 Object。)
TryPeek 尝试从 ConcurrentBag 返回一个对象但不移除该对象。
TryTake 尝试从 ConcurrentBag 中移除并返回对象。

三、 ConcurrentBag线程安全实现原理

1. ConcurrentBag的私有字段

ConcurrentBag线程安全实现重大是通过它的多寡存款和储蓄的构造和细颗粒度的锁。

   public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList对象包含每个线程的数据
        ThreadLocal<ThreadLocalList> m_locals;

        // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中
        // 允许在线程局部对象上枚举
        volatile ThreadLocalList m_headList, m_tailList;

        // 这个标志是告知操作线程必须同步操作
        // 在GlobalListsLock 锁中 设置
        bool m_needSync;

}

首荐我们来看它表明的个人字段,当中必要在意的是会晤的数额是存放在ThreadLocal线程本地存款和储蓄中的。也正是说访谈它的各类线程会珍视二个融洽的汇集数据列表,一个集合中的数据或许会存放在分化线程的本地存储空间中,所以玉树临风旦线程访谈自身本地存款和储蓄的靶子,那么是从未难题的,那就是促成线程安全的首先层,选取线程本地存款和储蓄数据

然后能够见到ThreadLocalList m_headList, m_tailList;那几个是存放着本地列表对象的头指针和尾指针,通过这四个指针,大家就能够透过遍历的办法来访谈具有地点列表。它使用volatile修饰,不容许线程举办本地缓存,每一个线程的读写都以一向操作在分享内部存款和储蓄器上,那就确定保障了变量始终具备生气勃勃致性。任何线程在其他时刻开展读写操作均是风靡值。对于volatile修饰符,感谢自己是工程师提议描述失实。

最终又定义了二个标注,这几个标记告知操作线程必需开展同步操作,那是贯彻了叁个细颗粒度的锁,因为唯有在多少个规范化满意的图景下才供给开展线程同步。

2. 用来数据存款和储蓄的TrehadLocalList类

接下去大家来看一下ThreadLocalList类的结构,该类就是实在存款和储蓄了数据的地点。实际上它是利用双向链表这种结构举行数量存款和储蓄。

[Serializable]
// 构造了双向链表的节点
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操作类型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 线程锁定的类
/// </summary>
internal class ThreadLocalList
{
    // 双向链表的头结点 如果为null那么表示链表为空
    internal volatile Node m_head;

    // 双向链表的尾节点
    private volatile Node m_tail;

    // 定义当前对List进行操作的种类 
    // 与前面的 ListOperation 相对应
    internal volatile int m_currentOp;

    // 这个列表元素的计数
    private int m_count;

    // The stealing count
    // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数
    internal int m_stealCount;

    // 下一个列表 可能会在其它线程中
    internal volatile ThreadLocalList m_nextList;

    // 设定锁定是否已进行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有当列表从空变为非空统计是底层
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 构造器
    /// </summary>
    /// <param name="ownerThread">拥有这个集合的线程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一个新的item到链表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新计数.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 因为进行初始化了,所以将空状态改为非空状态
        }
        else
        {
            // 使用头插法 将新的元素插入链表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新计数以避免此添加同步时溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 从列表的头部删除一个item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 双向链表删除头结点数据的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表头部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 从列表的尾部获取一个item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

从地点的代码中大家可以尤其证实以前的观念,正是ConcurentBag<T>在四个线程中蕴藏数据时,使用的是双向链表ThreadLocalList实现了意气风发组对链表增加和删除改查的主意。

3. ConcurrentBag得以完毕新增英镑素

接下去大家看蒸蒸日上看ConcurentBag<T>是何等新扩大成分的。

/// <summary>
/// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里
/// 这是避免内存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此时必须持有全局锁
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 从头线程列表开始枚举 找到那些已经被关闭的线程
    // 将它所在的列表对象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地帮助方法,通过线程对象检索线程线程本地列表
/// </summary>
/// <param name="forceCreate">如果列表不存在,那么创建新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 获取用于更新操作的 m_tailList 锁
        lock (GlobalListsLock)
        {
            // 如果头列表等于空,那么说明集合中还没有元素
            // 直接创建一个新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中
                // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程
                // 然后将已停止线程的数据 移交到当前线程管理
                list = GetUnownedList();
                // 如果没有 那么就新建一个列表 然后更新尾指针的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add)
    ThreadLocalList list = GetThreadList(true);
    // 实际的数据添加操作 在AddInternal中执行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁
        // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁
        if (list.Count < 2 || m_needSync)
        {
            // 将其重置为None 以避免与窃取线程的死锁
            list.m_currentOp = (int)ListOperation.None;
            // 锁定当前对象
            Monitor.Enter(list, ref lockTaken);
        }
        // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中
        // 如果已经锁定 那么说明线程安全  可以更新Count 计数
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

从位置代码中,大家能够很精晓的明亮Add()主意是怎样运行的,当中的基本点正是GetThreadList()方法,通过该办法可以获取当前线程的数量存款和储蓄列表对象,假使一纸空文数量存款和储蓄列表,它会自动创设大概通过GetUnownedList()办法来查找那个被甘休不过还蕴藏有数据列表的线程,然后将数据列表再次来到给当下线程中,防止了内部存款和储蓄器泄漏。

在数据增进的长河中,实现了细颗粒度的lock联合锁,所以品质会极高。删除和此外操作与新添类似,本文不再赘言。

4. ConcurrentBag 如何兑现迭代器形式

看完上边包车型客车代码后,作者很好奇ConcurrentBag<T>是怎么兑现IEnumerator来贯彻迭代访谈的,因为ConcurrentBag<T>是经过分散在不一样线程中的ThreadLocalList来存款和储蓄数据的,那么在落到实处迭代器情势时,进程会比较复杂。

前边再查看了源码之后,发掘ConcurrentBag<T>为了实现迭代器方式,将分在差异线程中的数据全都存到多个List<T>聚拢中,然后回来了该别本的迭代器。所以每回访谈迭代器,它都会新建二个List<T>的别本,那样即使浪费了迟早的仓库储存空间,不过逻辑上更是简明了。

/// <summary>
/// 本地帮助器方法释放所有本地列表锁
/// </summary>
private void ReleaseAllLocks()
{
    // 该方法用于在执行线程同步以后 释放掉所有本地锁
    // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 从冻结状态解冻包的本地帮助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先释放掉 每个线程中 本地变量的锁
    // 然后释放全局锁
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地帮助器函数等待所有未同步的操作
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操作完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地帮助器方法获取所有本地列表锁
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;

    // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁
    while (currentList != null)
    {
        // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 这将强制同步任何将来的添加/执行操作
    m_needSync = true;

    // 获取所有列表的锁
    AcquireAllLocks();

    // 等待所有操作完成
    WaitAllOperations();
}

/// <summary>
/// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。
/// 这不是线程安全, 应该被称为冻结/解冻袋块
/// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 创建一个新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先冻结整个 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 然后ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由地方的代码可领悟,为了获得迭代器对象,总共进行了三步关键的操作。

  1. 使用FreezeBag()办法,冻结风流罗曼蒂克切ConcurrentBag<T>聚拢。因为需求扭转集结的List<T>别本,生成别本时期不能够有此外线程改变损坏数据。
  2. ConcurrrentBag<T>生成List<T>副本。因为ConcurrentBag<T>储存数据的艺术相比较极度,直接实现迭代器情势困难,考虑到线程安全和逻辑,最棒的方式是生成四个别本。
  3. 做到上述操作之后,就足以选择UnfreezeBag()办法解冻整个会集。

那么FreezeBag()艺术是哪些来冻结人山人海切群集的呢?也是分为三步走。

  1. 先是获得全局锁,通过Monitor.Enter(GlobalListsLock, ref lockTaken);与上述同类一条语句,那样任何线程就无法冻结会集。
  2. 接下来拿走具有线程中ThreadLocalList的锁,通过`AcquireAllLocks()方法来遍历获取。这样任何线程就不可能对它进行操作损坏数据。
  3. 等候已经步入了操作流程线程甘休,通过WaitAllOperations()情势来兑现,该方法会遍历每二个ThreadLocalList对象的m_currentOp属性,确认保障整个地处None操作。

完结以上流程后,那么正是真的的冻结了风姿罗曼蒂克切ConcurrentBag<T>聚焦,要解冻的话也周边。在这里不再赘述。

四、总结

下边给出一张图,描述了ConcurrentBag<T>是什么存款和储蓄数据的。通过种种线程中的ThreadLocal来促成线程本地存款和储蓄,各个线程中都有那样的社团,互不忧愁。然后每种线程中的m_headList延续指向ConcurrentBag<T>的第一个列表,m_tailList本着最终二个列表。列表与列表之间通过m_locals
下的 m_nextList不仅仅,构成五个单链表。

多少存款和储蓄在各类线程的m_locals中,通过Node类构成三个双向链表。
PS:
要注意m_tailListm_headList而不是积累在ThreadLocal中,而是兼具的线程分享风流罗曼蒂克份。

图片 1

以上正是有关ConcurrentBag<T>类的落实,小编的风姿罗曼蒂克对记录和剖析。

笔者水平有限,若是不当接待各位商议指正!

附上ConcurrentBag<T>源码地址:戳一戳

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*
*
Website