注册 登录

清河洛

Go语言中的并发和通道

qingheluo2021-09-04清河洛399
Go语言中的并发Go语言中通过go关键字来开启一个不同的、新创建的运行期goroutine实现并发goroutine是轻量级线程(可以理解为协程),调度是由Golang运行时进行管理的同一个程序中的所有 goroutine 共享同一个地址空间goroutine 语法格式:go func_name( args )一个Go程序启动后,main函数最先运行,称之为main goroutine,相当于主线程,当main函数执行完成后,主线程也就终结了,其下运行着的所有协程无论是否正在运行都会强制退出func demo(){fmt.Println("goroutine")} func main()...

Go语言中的并发

Go语言中通过go关键字来开启一个不同的、新创建的运行期goroutine实现并发

goroutine是轻量级线程(可以理解为协程),调度是由Golang运行时进行管理的

同一个程序中的所有 goroutine 共享同一个地址空间

goroutine 语法格式:go func_name( args )

一个Go程序启动后,main函数最先运行,称之为main goroutine,相当于主线程,当main函数执行完成后,主线程也就终结了,其下运行着的所有协程无论是否正在运行都会强制退出

func demo(){fmt.Println("goroutine")}
func main(){
    fmt.Println("main")
    go demo()
}
以上只会打印main,而不会打印goroutine

使用go关键字创建的goroutine都是并发运行的,并非并行运行,如果当前goroutine不发生阻塞,是不会让出CPU给其他goroutine的

通道(channel)

通道(channel)是运行期线程(goroutine)之间用来传递数据(通讯)的特殊类型

对于通道中的同一个值,发送操作和接收操作是互斥的。如正在被复制进通道但还未复制完成的元素值,这时接收方不会看到和取走

发送操作和接收操作中对元素值的处理都是不可分割的,发送操作要么还没复制元素,要么已经复制完毕,不会出现值只复制了一部分的情况

发送操作在完全完成之前会被阻塞。接收操作也是如此

元素值从外界进入通道会被复制。也就是说进入通道的并不是在接收操作符右边的那个元素值,而是他的副本

发送操作包括,“复制元素值”,“放置副本到通道内” 二个步骤。在步骤完成之前,发送操作会一直阻塞

接收操作包括“复制通道内元素值”,“放置副本到接收方”,“删除通道内的元素值”三个操作。操作在完成之前也是会一直阻塞

使用make函数创建通道

var chan_name = make (chan type [, cache_size])
cache_size表示缓冲区大小
默认情况下通道是不带缓冲区的

通道的缓冲区

不带缓冲区的通道:发送端发送数据,同时必须有接收端接收数据
带缓冲区的通道:
    允许发送端的数据发送和接收端的数据获取处于异步状态
    就是说发送端发送的数据可以放在缓冲区里面,而不是立刻需要接收端去获取数据
对于无缓冲区的通道,发送者和接收者写在了同一协程中必定导致死锁

通道的阻塞

不带缓冲区的通道:发送方会阻塞直到接收方从通道中接收了值

带缓冲区的通道:
    缓冲区已满时,发送方会阻塞直到发送的值被拷贝到缓冲区内(直到某个接收方获取缓冲区内的值)
    缓冲区为空时,接收方在有值可以接收之前会一直阻塞

单项通道

单向信道,可分为只读信道和只写信道

type du_name = <- chan type
type xie_name = chan <- type
以上表示创建一个只读或只写的通道别名类型
然后使用make函数创建
var variable = make(du_name)

发送或接收数据

操作符(<-)用于指定通道的方向用来发送或接收数据

ch <- v
    把 v 发送到通道 ch
v := <-ch
    从 ch 接收数据, 并把值赋给 v
v,ok := <-ch
    从 ch 接收数据, 并把值赋给 v, ok参数表示通道是否已经关闭(bool值),可以用来判断channel是否已关闭

向一个已关闭的channel发送消息会产生panic

从已关闭的channel读取消息不会产生panic,且永远不会阻塞,能读出channel中还未被读取的消息,若消息均已被读取,则会读取到该类型的默认值
可以利用这个特性指定一个关闭所有goroutine的机制

func demo (n int,ch chan int){
    for {
        select{
        case v := <-ch:
            fmt.Printf("监控器%v接收到通道值%v,监控结束",n,v)
            return
        default:
            fmt.Printf("监控器%v正在监控中....",n)
            time.Sleep(2 * time.Second)
        }
    }
//一个无限循环,里面的select会一直尝试读取通道中的值,当读取到通道中的值后return退出
}

func main(){
    ch := make(chan int)
    for i:=1 ; i<=5 ; i++{
        go demo(i,ch)
    }
    time.Sleep(15 * time.Second)
    close(ch)
    //当15秒后关闭通道后,所有线程中都可以读取到0值
    time.Sleep(5 * time.Second)
    fmt.Printf("主程序退出")
}

遍历通道

通道一次只能接收一个数据元素

通过range关键字来实现遍历读取到的数据

通道遵循先入先出的规则,保证收发数据的顺序

for val := <-chan_name{}

在使用for循环读取通道数据时,for循环会一直持续直到通道关闭

关闭通道

close(chan_name):关闭指定的通道

如何优雅的控制协程

当main函数执行完成后,主线程也就终结了,其下运行着的所有协程无论是否正在运行都会强制退出

那么怎么保证主线程在所有协程运行完毕后才退出

一、使用time.Sleep函数让main函数延时退出

但在实际开发中,无法预知所有的goroutine需要多长的时间才能执行完毕,时间太长主程序就阻塞了,时间太短所有协程的任务无法保证完成,因此,使用time.Sleep是一种极不推荐的方式

二、使用通道阻塞主线程

func main(){
    done := make(chan bool)
    go func(){
        for i := 0 ; i < 999 ; i++{
            fmt.Println(i)
        }
        done <- true
    }()
    <-done
}

以上代码通过创建一个无缓冲区的通道,在主线程最后去获取通道的值,但是由于通道中没有值,会阻塞直到协程中代码执行完成后向通道写入值为止
该方法在协程数少的时候,并不会有什么问题,但在协程数多的时候,代码就会显得非常复杂

三、使用WaitGroup

这里说的WaitGroup是sync包中提供的WaitGroup类型

使用var variable sync.WaitGroup就可以实例化该类型

实例化以后就可以使用几个方法


Add(n):为计数器增加n,初始值为0,一般传入子协程的数量
Done():调用此方法,计数器会减1,一般当某个子协程完成后调用此方法
Wait():阻塞当前协程,直到实例中的计数器归零,一般用在主线程中的最后

func demo(n int , wg *sync.WaitGroup){
    defer wg.Done()
    for i := 1 ; i<=999 ; i++{
        fmt.Printf("协程%d打印:%d",n,i)
    }
}

func main(){
    var wg sync.WaitGroup

    wg.Add(3)
    go demo(1,&wg)
    go demo(2,&wg)
    go demo(3,&wg)

    wg.Wait()
}

四、使用Context

上面的例子中用在控制主线程生成的子协程,但是有些情况子协程还可能在生成子子协程,子子协程还可能再生成子协程,那么这些协程使用上面的方法控制就很复杂了

Context是context包中提供的Context接口

func monitor(ctx context.Context, number int) {
    for {
        select {
        case <- ctx.Done():
            fmt.Printf("监控器%v,监控结束。", number)
            return
        default:
            fmt.Printf("监控器%v,正在监控中...", number)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5* time.Second)
    for i :=1 ; i <= 5; i++ {
        go monitor(ctx, i)
    }

    time.Sleep(3  * time.Second)
    cancel()
    //这里手动执行了cancel函数来关闭Context
    if ctx.Err() != nil {
        fmt.Println("监控器取消的原因: ", ctx.Err())
    }
    fmt.Println("主程序退出!!")
}


网址导航