s3go语言 s3gogoing

组件分享之后端组件——基于Golang实现的高性能和弹性的流处理器benthos

近期正在探索前端、后端、系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题,后续该专题将包含各类语言中的一些常用组件。欢迎大家进行持续关注。

创新互联公司专注服务器托管服务十多年,提供服务器租用、网页空间、主机域名、云服务器、云主机租用、雅安服务器托管、建站等服务

本节我们分享的是基于Golang实现的高性能和弹性的流处理器 benthos ,它能够以各种代理模式连接各种 源 和 接收器,并对有效负载执行 水合、浓缩、转换和过滤 。

它带有 强大的映射语言 ,易于部署和监控,并且可以作为静态二进制文件、docker 映像或 无服务器函数 放入您的管道,使其成为云原生。

Benthos 是完全声明性的,流管道在单个配置文件中定义,允许您指定连接器和处理阶段列表:

Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP UDP, sockets and ZMQ4.

1、docker安装

具体使用方式可以参见该 文档

有关如何配置更高级的流处理概念(例如流连接、扩充工作流等)的指导,请查看 说明书部分。

有关在 Go 中构建您自己的自定义插件的指导,请查看 公共 API。

【Minio】基于AWS S3协议搭建个人云存储服务

在2007年,GlusterFS演变为大型分布式存储方案后,任何配备合适硬件的公司,单位都可以利用个做分布式的流媒体,数据分析。在2011年,Red Hat收购了GlusterFS.

Minio是GlusterFS创始人之一Anand Babu Periasamy发布新的开源项目。Minio兼容Amason的S3分布式对象存储项目,采用Golang实现,客户端支持Java,Python,Javacript, Golang语言。

Minio 提供对象存储服务,兼容了 AWS S3 存储协议,用于非结构化的数据存。非结构化对象,比如图像,音、视频,日志文件,备份镜像…等等管理不方便,不定长,大小变化大、类型多,云端的访问复杂,minio就是来解决这种场景的。非结构化的文件从数KB到5TB都能很好的支持。开源并且用 Go 语言开发,有web操作界面,我们可以用它来搭建兼容S3协议的存储云服务。

Minio可以做为云存储的解决方案用来保存海量的图片,视频,文档。由于采用Golang实现,服务端可以工作在Windows,Linux, OS X和FreeBSD上。配置简单,基本是复制可执行程序,单行命令可以运行起来。

官网:

那么,如何自己搭建一个私有的S3存储云服务呢?

官方的话是推荐用Docker来搞,我们先用普通的二进制文件来直接解决了!

######################################################################################

# mkdir /data/aws_s3

# wget  

# mv  minio /usr/local/bin/

#  chmod  755  /usr/local/bin/minio 

# minio server  /data/aws_s3

#############################################################

Created minio configuration file successfully at /root/.minio

Endpoint:    

AccessKey: U3XLU4IMXY3IDKHU268F 

SecretKey: /6NCL6HGacviaCgRqr2qLbVOjhkkJdRpV7wz0JJD 

Region:    us-east-1

SQS ARNs:  

Browser Access:

Command-line Access: 

################################################################

$ mc config host add myminio   U3XLU4IMXY3IDKHU268F /6NCL6HGacviaCgRqr2qLbVOjhkkJdRpV7wz0JJD

Object API (Amazon S3 compatible):

Go: 

Java: 

Python: 

JavaScript: 

Drive Capacity: 8.3 GiB Free, 9.1 GiB Total

##############################################################

我们就成功启动了minio的s3服务,默认端口9000,可以通过网页访问:

注意 :第一次打开时候需要填写AccessKey和SecretKey才能进入,我们上面启动服务的时候,已经看到屏幕有输出:

AccessKey: U3XLU4IMXY3IDKHU268F 

SecretKey:6NCL6HGacviaCgRqr2qLbVOjhkkJdRpV7wz0JJD

把这两个Key填入,就能顺利进入,进入后展开页面如下:

这就是我们的S3云存储的管理页面了,看着是不是和七牛什么的提供云存储的产品页面挺像的,大家都是基于S3协议开发的!

上传个文件试试:

点击右下角的红色小加号按钮,弹出的菜单选择”create bucket”则会创建一个桶,输入名字”test”

点击刚才那个红色小加号按钮,这次选择”Upload file”上传文件,给这个桶上传了一个叫login.txt的文本文档

此时页面如下:

至此我们可以看到文件已经上传,要访问这个文件,可以点击文件右侧的三个点的按钮,选择分享就可以得到一个外链,在浏览器中访问这个外链就可以直接访问文件。

那么文件到底被存到哪里去了呢,我们启动命令中其实指定了工作路径/data/aws_s3/,所以到服务器这个目录下看看:

# ls /data/aws_s3/ 

test

# ls /data/aws_s3/test/

login.txt 

桶名称test是一个目录,其下就有上传的login.txt文件。

如果想指定ip和端口,可以这样写:

# minio server /data/aws_s3 --address=0.0.0.0:9000

如果想让服务在后台运行:

# nohup minio server /data/aws_s3   --address=0.0.0.0:443 

[1] 19882

// nohup: 忽略输入并把输出追加到启动命令的当前目录下的 "nohup.out"文件

minio可以用来搭建分布式存储系统 GlusterFS,这样就成了真正的云存储了,有时间再研究下把它从现在的单机测试,变成一朵存储云!

minio官网:

minio官方文档:

minio github主页:

golang使用Nsq

1. 介绍

最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。

官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。

1.1 Features

1). Distributed

NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。

2). Scalable易于扩展

NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。

3). Ops Friendly

NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。

4.Integrated高度集成

官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。

1.2 组件

1.3 拓扑结构

NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。

NSQ

首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。

事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。

nsqd

每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。

nsqlookupd

2. Internals

2.1 消息传递担保

1)客户表示已经准备好接收消息

2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)

3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息

这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。

如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。

2.2 简化配置和管理

单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每个通道都接收到一个话题中所有消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题。

在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。

2.3 消除单点故障

NSQ被设计以分布的方式被使用。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例。没有中间人,没有消息代理,也没有单点故障。

这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有生产者。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,保证所有东西最终将被处理。对于 nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的 nsqlookupd 实例和合并 response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。

2.4 效率

对于数据的协议,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,称之为 RDY 状态,基本上是客户端流量控制的一种形式。

efficiency

2.5 心跳和超时

组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误。当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。

2.6 分布式

因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区。

这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集。清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显。

2.7 no replication

不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列。但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况。

2.8 没有严格的顺序

虽然Kafka由一个有序的日志构成,但NSQ不是。消息可以在任何时间以任何顺序进入队列。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况。

2.9 无数据重复删除功能

NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡。很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性。

3. 实践安装过程

本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单。这部分介绍下笔者实验的拓扑,以及nsqadmin的相关信息。

3.1 拓扑结构

topology

实验采用3台NSQD服务,2台LOOKUPD服务。

采用官方推荐的拓扑,消息发布的服务和NSQD在一台主机。一共5台机器。

NSQ基本没有配置文件,配置通过命令行指定参数。

主要命令如下:

LOOKUPD命令

NSQD命令

工具类,消费后存储到本地文件。

发布一条消息

3.2 nsqadmin

对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息。

nsqadmin

channel

列出所有的NSQD节点:

nodes

消息的统计:

msgs

lookup主机的列表:

hosts

4. 总结

NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者可以自行处理故障事件而不会影响系统剩下的其余部分。

事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。

结合我们的业务系统来看,对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回。这是我们没有选择该消息中间件的主要原因。简单性和可靠性似乎并不能完全满足。相比Kafka,ops肩负起更多负责的运营。另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务。但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础。

go语言版本的Gossip协议包(memberlist)的使用

由于工作的契机,最近学习了下Gossip,以及go语言的实现版本HashiCorp/memberlist。网上有个最基本的memberlist使用的example,在下边的链接中,感兴趣可以按照文档运行下感受感受。本文主要讲解memberlist v0.1.5 的使用细节。

Gossip是最终一致性协议,是目前性能最好,容错性最好的分布式协议。目前Prometheus的告警组件alertmanager、redis、s3、区块链等项目都有使用Gossip。本文不介绍Gossip原理,大家自行谷歌。

简单的几步即可搭建gossip集群

感谢已经有网友为我们实现了一个example(

)。

哪里有问题,还请大家多多指正


新闻名称:s3go语言 s3gogoing
文章出自:http://pwwzsj.com/article/dooiccc.html