注册 登录

清河洛

Go的sync协程同步

qingheluo2024-08-05清河洛272
由于go天然对并发的支持使得go在一些并发任务处理时表现优异,但是并发会存在临界资源问题,当多个协程来访问共享资源时资源是不安全的在一般场景中为了解决这个临界资源问题,可以使用channel通道来解决,但是当协程结构比较复杂时处理比较复杂易出错Go的官方sync包实现了传统编程语言中的“锁”,通过一个“锁”将共享资源锁住,防止其它协程的访问,以此来解决临界资源问题Once表示只执行一次动作的对象 type Once struct{} // 没有导出字段,也没有函数用于创建Once var once sync.Once Once只有一个方法 Do(f func()) ...

由于go天然对并发的支持使得go在一些并发任务处理时表现优异,但是并发会存在临界资源问题,当多个协程来访问共享资源时资源是不安全的

在一般场景中为了解决这个临界资源问题,可以使用channel通道来解决,但是当协程结构比较复杂时处理比较复杂易出错

Go的官方sync包实现了传统编程语言中的“锁”,通过一个“锁”将共享资源锁住,防止其它协程的访问,以此来解决临界资源问题

Once

表示只执行一次动作的对象

type Once struct{}
    // 没有导出字段,也没有函数用于创建Once
    var once sync.Once

Once只有一个方法  Do(f func())

Do方法当且仅当第一次被调用时才执行指定函数,即使每次调用提供的f值不同
只有f返回后Do方法才会返回,如果f中引起了Do的调用,会导致死锁
主要用于系统的一次性初始化相关工作
需要注意:即使 f 运行出现异常,Do 也会认为它已经返回,后续对 Do 的调用会直接返回而不调用 f

使用示例
var once sync.Once
func MyFunc(){ ... }

// 启用2个协程运行
go once.Do(MyFunc)
go once.Do(MyFunc)

// 在主线程2次运行
once.Do(MyFunc)
once.Do(MyFunc)

以上代码片段中,无论是在多个协程中多次运行,还是在主线程中多次运行,可以保证MyFunc函数仅运行一次

OnceFunc和OnceValue[s]

创建一个可安全用于并发调用的一次性函数,无论多少个协程调用,都仅会在首次调用时执行,后续调用将无任何作用

func OnceFunc(f func()) func()      创建没有返回值的一次性函数
func OnceValue[T any](f func() T) func() T  :创建一个返回值的一次性函数
func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)  :创建返回两个值的一次性函数

sync.OnceValue[string](func() string {
    return time.Now().Format("20060102150405")
})

Mutex

表示一个互斥锁,一个加锁状态的互斥锁可以被任意协程解锁,但在解锁之前任意协程对该互斥锁进行加锁操作都会阻塞直到该互斥锁被解锁

type Mutex struct{}
    // 没有导出字段,也没有函数用于创建Mutex
    var mutex sync.Mutex

Mutex的方法

Lock()         :加锁,如果已是加锁状态,会阻塞直到解锁
TryLock() bool :尝试加锁并返回是否成功
Unlock()       :解锁,如果未加锁会导致运行时错误

RWMutex

表示一个读写互斥锁,可以被同时多个读取者持有或唯一个写入者持有

type RWMutex struct{}
    // 没有导出字段,也没有函数用于创建RWMutex
    var rwMutex sync.RWMutex

RWMutex的方法

Lock()      获取写入锁,禁止其他协程获取读取锁和写入锁
        获取写入锁时如果存在任何未解锁的写入锁或读取锁,会阻塞直到所有锁均解锁
TryLock() bool  尝试获取写入锁并返回是否成功
Unlock()     解锁写入锁,未加写入锁会导致运行时错误

RLock()      读取锁,禁止其他线程获取写入锁但允许继续获取读取锁
        任何一个读取锁的获取锁和解除锁一定要两两成对出现
        同一协程如果在循环体或递归中会重复获取读取锁会导致死锁
        允许同一协程以字面量(每行显示编写)方式连续获取读取锁,但是解锁需要同样数量的RUnlock()
        当一个协程运行了Lock()获取写入锁但是由于存在未解锁的读取锁处于阻塞状态时,新的获取读取锁操作会阻塞
TryRLock() bool
RUnlock()     解锁读取锁,如果未加写入锁会导致运行时错误

RLocker() Locker   获取一个实现了Locker接口的实例

WaitGroup

用于等待一组线程的结束

父线程调用Add方法来设定应等待的线程的数量,每个被等待的线程在结束时调用Done方法,同时主线程调用Wait方法阻塞至所有线程结束

type WaitGroup struct{}
    // 没有导出字段,也没有函数用于创建WaitGroup
    var wg sync.WaitGroup

Add(delta int)  :向内部计数加上delta(可以是负数)
    如果内部计数器变为0,Wait方法会解除阻塞并释放所有等待的线程
    如果计数器小于0,会panic
Done() :计数器的值减1,应在协程的最后执行
    源码中Done函数体仅有一行 wg.Add(-1)
Wait()  :阻塞直到计数器为0

Cond(条件变量)

Cond是一种用于协调多个协程之间同步和通信的机制

Cond让多个协程在某个条件不满足时挂起(进入等待状态),直到有协程通知该条件已满足,从而唤醒等待的协程继续执行

Cond必须与一个sync.Mutex或sync.RWMutex结合使用,以保护共享条件状态的访问,在检查条件和调用等待方法时,必须持有锁

推荐优先使用 Channel 实现协程通信,在共享内存模型中(如多个协程竞争同一资源)Cond更高效

type Cond struct {
    L Locker
}

创建 Cond

NewCond(l Locker) *Cond

type Locker interface {
    Lock()
    Unlock()
}

Cond的方法

Wait()      释放锁并阻塞,直到收到通知,调用前必须持有锁,唤醒后会自动重新获取锁
            释放锁让其他协程可以修改共享资源
Signal()    唤醒等待队列中的一个协程,唤醒哪个协程是不确定的
Broadcast() 唤醒等待队列中的所有协程

关键注意事项

1、 在调用Wait()、Signal()、Broadcast()时必须持有锁
2、 Wait()调用后会自动释放锁,并在唤醒后重新获取锁
3、 协程被唤醒可能被虚假唤醒,即使未收到通知,协程也可能被系统调度器唤醒
4、 正常唤醒的协程由于要获取锁,在获取到锁时资源可能已经被其他协程修改了
5、 由于以上3和4的可能性,所以Wait()必须要放在循环中,在Wait()返回后重新检查条件

cond.L.Lock()
for condition {
    cond.Wait()
}
...
cond.L.Unlock()
    // 不需要操作资源后释放锁

condition表示要检查的资源状态
当被唤醒后会获取锁,当获取到锁时由于在循环中,会进行一次条件的检查判断
    如果符合,那么退出循环执行后续逻辑
    如果不符合,如虚假唤醒、资源已被其他协程修改等,会再一次执行Wait()等待下次被唤醒

Pool

是一组单独保存和检索的临时对象的集合,类似于“池”

type Pool struct {
    New func() any
}

Pool适合存储一些可能被重复使用,无状态或状态可重置的对象,多个goroutine同时使用一个Pool是安全的

Pool的作用是构建高效、线程安全的空闲列表,缓存已分配但未使用的项,以供以后重复使用,从而减轻垃圾回收器的压力

Pool不保证其中对象的可用性,如果Pool是一个对象的唯一引用,那么这个对象可能会被释放或回收,而没有任何通知

Pool中唯一的导出元素New是一个函数,该函数用于替换在获取一个存储对象时返回的nil,用于动态生成新的值

有一个非常重要的点,就是虽然Pool是并发安全的,但是New()函数的具体实现是我们自己编写的,如果在New()方法中存在对一些公共数据的修改,那么在编写的时候一定要注意并发安全

Pool会根据对这个“池”的引用数量或负载情况进行自动扩展和收缩

Pool的方法

Put(x any)  :将值x放入Pool中
    需要注意:由于x的值为any,并且内部没有进行类型的判断和断言
    如果Put时不对x的数据类型做限制,那么Pool中就可能存在各种类型的数据,导致在Get后需要进行类型判断
Get() any   :从池中随机获取一个项,并将其从Poll中移除
    Get方法可能会在Pool中有成员时调用New方法返回一个新值

Map

类似于map[any]any,但对于多个协程并发使用是安全的,无需额外的锁定或协调

type Map struct {}
    // 没有导出字段,也没有函数用于创建Mutex
    var map sync.Map

增
LoadOrStore(key, value any) (actual any, loaded bool)
        如果 key 不存在则添加该 key 及其对应的值value
        否则返回 key 对应的值

删
Clear()           删除所有条目
Delete(key any)   删除指定 key 及其值
CompareAndDelete(key, old any) (deleted bool)
        如果 key 对应的值等于 old,删除 key 的条目并返回true
        old 必须是可比较的类型
LoadAndDelete(key any) (value any, loaded bool)
        删除指定 key 及其值,如果存在 key 返回该值
        loaded表示 key 是否存在

改
Store(key, value any)    设置 key 的值
Swap(key, value any) (previous any, loaded bool)
        替换 key 的值为 value,并返回先前的值
        loaded表示 key 是否存在
CompareAndSwap(key, old, new any) (swapped bool)
        如果 key 对应的值等于 old,将交换 key 的新旧值并返回true
        old 必须是可比较的类型

查
Load(key any) (value any, ok bool)
        返回 key 对应的值,key 不存在返回 nil

其他
Range(f func(key, value any) bool)
        依次将每个键值对传入 f 中,直到 f 返回false时停止


网址导航