登录
原创

Go开发-使用Goroutine如何控制HTTP请求的并发量

发布于 2020-10-15 阅读 2992
  • 后端
  • HTTP
  • Go
原创

一、明确需求

我们使用 go 并发调用接口发起 HTTP 请求时,只需要在 func() 前面加上 go 关键字就很容易完成了,就是因为让并发变得如此简单,所以有的时候我们就需要控制一下并发请求的数量。

现在有个需求:本地有一千万条手机号,需要调用聚合数据 手机号码归属地 接口,并记录省份、城市、区号、邮编、运营商等查询结果数据。

Tips: 测试的接口为聚合数据免费的: 手机号码归属地 接口,如果需要测试下面的用例,建议先去注册申请接口即可。而且聚合数据还提供很多免费的接口供开发者调试使用。

c02d6e1ad7f36ecd9381e2967c500c27.jpg
我们先模拟不加控制协程数量导致的宕机问题,新建一个 warning.go 内容如下:

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "net/url"
)

func main() {
    // 接口请求URL
    apiUrl := "http://localhost" // 不要使用接口地址测试
    max := 1<<63 - 1             // 最大整数,模拟大量数据

    // 初始化参数
    param := url.Values{}
    // 配置请求参数,方法内部已处理urlencode问题,中文参数可以直接传参
    param.Set("a", "test") // 需要查询的手机号码或手机号码前7位

    for i := 0; i < max; i++ {
        go func(i int) {
            // 一些逻辑代码...
            fmt.Printf("start func: %d\n", i)
            // 发送请求
            data, err := Get(apiUrl, param)
            if err != nil {
                fmt.Println(err)
                return
            }
            fmt.Println(string(data))
        }(i)
    }
}

// Get 方式发起网络请求
func Get(apiURL string, params url.Values) (rs []byte, err error) {
    var Url *url.URL
    Url, err = url.Parse(apiURL)
    if err != nil {
        fmt.Printf("解析url错误:\r\n%v", err)
        return nil, err
    }
    //如果参数中有中文参数,这个方法会进行URLEncode
    Url.RawQuery = params.Encode()
    resp, err := http.Get(Url.String())
    if err != nil {
        fmt.Println("err:", err)
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

高能命令:!!!不要执行!!!

go run waning.go

打开瞬间 top 命令的截图如下,可以看到 CPU 瞬间起来:

WX202010142054032x.png

由于没有限制 goroutine 数量,如果我们把全部任务都放到并发 Goroutine 中去执行,虽然效率比较高。但当不加控制的 goroutine 疯狂创建时候,服务器系统资源使用率飙升宕机,直到进程被自动 kill 不然无法提供任何其它服务。

二、用带缓冲区的 channel 控制并发量

上面的案例是我们不加控制 goroutine 数量限制从而导致宕机的,因此只要我们控制了 goroutine 数量就能避免这种问题!

下面是使用 sync.WaitGroup{} 控制并发量的代码,新建一个 main.go 内容如下:

package main

import (
    "fmt"
    "io/ioutil"
    "math/rand"
    "net/http"
    "net/url"
    "sync"
    "time"
)

type Limit struct {
    number  int
    channel chan struct{}
}

// Limit struct 初始化
func New(number int) *Limit {
    return &Limit{
        number:  number,
        channel: make(chan struct{}, number),
    }
}

// Run 方法:创建有限的 go f 函数的 goroutine
func (limit *Limit) Run(f func()) {
    limit.channel <- struct{}{}
    go func() {
        f()
        <-limit.channel
    }()
}

// WaitGroup 对象内部有一个计数器,从0开始
// 有三个方法:Add(), Done(), Wait() 用来控制计数器的数量
var wg = sync.WaitGroup{}

const (
    concurrency = 5 // 控制并发量
)

func main() {
    start := time.Now()
    limit := New(concurrency) // New Limit 控制并发量
    // 接口请求URL
    apiUrl := "http://apis.juhe.cn/mobile/get" // 不要使用接口地址测试
    //max := int(math.Pow10(8))                  // 模拟一千万数据
    max := 5                                    // 先测试5次吧

    // 初始化参数
    param := url.Values{}
    param.Set("key", "您申请的KEY") // 接口请求Key

    for i := 0; i < max; i++ {
        wg.Add(1)
        value := i
        goFunc := func() {
            fmt.Printf("start func: %d\n", value)
            // 配置请求参数,方法内部已处理urlencode问题,中文参数可以直接传参
            phone := RandMobile()
            param.Set("phone", phone) // 需要查询的手机号码或手机号码前7位
            // 发送请求
            data, err := Get(apiUrl, param)
            if err != nil {
                fmt.Println(err)
                return
            }
            // 其它逻辑代码...
            fmt.Println("phone: ", phone, string(data))
            wg.Done()
        }
        limit.Run(goFunc)
    }

    // 阻塞代码防止退出
    wg.Wait()

    fmt.Printf("耗时: %fs", time.Now().Sub(start).Seconds())
}

// Get 方式发起网络请求
func Get(apiURL string, params url.Values) (rs []byte, err error) {
    var Url *url.URL
    Url, err = url.Parse(apiURL)
    if err != nil {
        fmt.Printf("解析url错误:\r\n%v", err)
        return nil, err
    }
    //如果参数中有中文参数,这个方法会进行URLEncode
    Url.RawQuery = params.Encode()
    resp, err := http.Get(Url.String())
    if err != nil {
        fmt.Println("err:", err)
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

var MobilePrefix = [...]string{"130", "131", "132", "133", "134", "135", "136", "137", "138", "139", "145", "147", "150", "151", "152", "153", "155", "156", "157", "158", "159", "170", "176", "177", "178", "180", "181", "182", "183", "184", "185", "186", "187", "188", "189"}

// GeneratorPhone 生成手机号码
func RandMobile() string {
    return MobilePrefix[RandInt(0, len(MobilePrefix))] + fmt.Sprintf("%0*d", 8, RandInt(0, 100000000))
}

// 指定范围随机 int
func RandInt(min, max int) int {
    rand.Seed(time.Now().UnixNano())
    return min + rand.Intn(max-min)
}

这时候我们执行 go run main.go ,然后执行结果如下:

gosafe.png

使用 go 带缓冲区的通道来控制 goroutine 的并发数量更加简单实用, 而且我们还可以把 New()Run() 方法封装起来外部项目直接引入使用。

三、扩展思考

下面有两个思考问题,大家可以尝试着去思考一下。

思考1:为什么我们要使用 sync.WaitGroup

如果我们不使用 sync.WaitGroup 阻塞主进程的话,当主程序执行结束后,子协程未执行也会被终止掉的。因此剩余的 goroutine 没来及执行,程序就已经结束了。

思考2:代码中 channel 数据结构为什么定义 struct,而不定义成 bool 类型呢?

因为空结构体变量的内存占用大小为 0,而 bool 类型内存占用大小为 1,这样可以最大化利用服务器的内存空间。

func main(){
  a :=struct{}{}
  b := true
  fmt.Println(unsafe.Sizeof(a))  // 0
  fmt.Println(unsafe.Sizeof(b))  // 1
}

评论区

励志做一条安静的咸鱼,从此走上人生巅峰。

5

1

3

举报