0

0

go语言实现日志收集系统图文详解

尚

发布时间:2019-11-27 15:47:35

|

4406人浏览过

|

来源于博客园

转载

go语言实现日志收集系统图文详解

整理了一下这个日志收集系统的框,如下图

1.jpg这次要实现的代码的整体逻辑为:

2.jpg完整代码地址为: https://github.com/pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

立即学习go语言免费学习笔记(深入)”;

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

1、服务发现和服务注册

2、配置中心(我们实现的日志收集客户端需要用到)

3、分布式锁

4、master选举

官网对etcd的有一个非常简明的介绍:

3.jpg

etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据自己的环境下载对应的版本然后启动起来就可以了

SV-Cart网店系统
SV-Cart网店系统

SV-Cart是开源的电子商务平台。多语言,国际化SV-CART网店系统是一套可以支持各个国家的语言显示的国际电子商务系统,现已支持中文简体、英文、日文、德文和法文,土耳其文,可实现这五种语言在同一平台上的相互转换。免费、开源SV-CART网店系统是一项新的专业开放源代码的WEB2.0网上商城系统,是一套集网上购物和网站内容管理于一体的电子商务解决方案。易操作、多功能SV-CART系统注重操作上的

下载

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

1、控制goroutine的超时

2、保存上下文数据

通过下面一个简单的例子进行理解:

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)


type Result struct{
    r *http.Response
    err error
}

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
        fmt.Println("http request failed,err:",err)
        return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
    }()

    select{
    case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return

}

func main() {
    process()
}

写一个通过context保存上下文,代码例子如:

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    return a+b
}

func calc(ctx context.Context,a, b int) int{
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
}

func main() {
    //将ctx传递到calc中
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)

}

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}

下面一个例子是通过连接etcd,存值并取值

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect succ")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
        fmt.Println("put failed,err",err)
        return
    }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev := range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

package main

import (
    "context"
    "fmt"
)

func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return // returning not to leak the goroutine
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // cancel when we are finished consuming integers

    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            break
        }
    }
}

关于官网文档中的WithDeadline演示的代码例子:

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    defer cli.Close()
    // 这里会阻塞
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

实现一个kafka的消费者代码的简单例子:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

}

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var (
    wg sync.WaitGroup
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

}

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string


// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

    Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
    })
    if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
    }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 从etcd中获取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
            logs.Warn("get key %s failed,err:%v",key,err)
            continue
        }

        for _, ev := range resp.Kvs{
            logs.Debug("%q : %q\n",  ev.Key, ev.Value)
            logConfChan <- string(ev.Value)
        }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
    }

    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev:= range wresp.Events{
                    logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
                    logConfChan <- string(ev.Kv.Value)
                }
            default:

            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf()chan string{
    return logConfChan
}

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
}

func NewSecondLimit(limit int32) *SecondLimit {
    secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
    }
    return secLimit
}

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait()bool {
    for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond,sec)
            atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
    }
}

推荐:go语言教程

相关专题

更多
PHP API接口开发与RESTful实践
PHP API接口开发与RESTful实践

本专题聚焦 PHP在API接口开发中的应用,系统讲解 RESTful 架构设计原则、路由处理、请求参数解析、JSON数据返回、身份验证(Token/JWT)、跨域处理以及接口调试与异常处理。通过实战案例(如用户管理系统、商品信息接口服务),帮助开发者掌握 PHP构建高效、可维护的RESTful API服务能力。

145

2025.11.26

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

319

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

227

2023.10.07

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

166

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

199

2024.02.23

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

989

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

50

2025.10.17

桌面文件位置介绍
桌面文件位置介绍

本专题整合了桌面文件相关教程,阅读专题下面的文章了解更多内容。

0

2025.12.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Rust 教程
Rust 教程

共28课时 | 3.9万人学习

Kotlin 教程
Kotlin 教程

共23课时 | 2.1万人学习

Go 教程
Go 教程

共32课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号