博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
openresty lua集成kafka
阅读量:663 次
发布时间:2019-03-15

本文共 4119 字,大约阅读时间需要 13 分钟。

前提

1、 安装openresty,记得安装nginx的监控模块

2、 安装kafka
3、 下载lua+kafka插件:
4、 解压插件,将lua-resty-kafka-master\lib\resty\kafka文件夹放到openresty/lualib/resty下

首先修改openresty的配置文件中localtion位置,引入外部lua文件,这样修改lua文件会比较方便

location / {
default_type text/html; content_by_lua_file /usr/local/openresty/tmp.lua; }

案例

1 先获取kafka的实例

2 通过实例获取连接
3设置分区发送策略
4调用send方法发送数据
5启动一个kafka消费测试,验证是否发送成功

lua代码

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by NH55.--- DateTime: 2020/12/11 11:48------ 数据采集运行线程阈值监控,如果超过了我们设置的最大阈值,那么就等待不send数据,下个批次再次执行local DEFAULT_THRESH = 100-- 编写kafka相关配置-- 配置broker地址local BROKER_LIST = {
{
host = "192.168.xx.101", port = 9092 }, {
host = "192.168.xx.102", port = 9092 }, {
host = "192.168.xx.103", port = 9092 }}-- kafka分区数local PARTITION_NUM = 3-- kafka的topiclocal TOPIC = "csdn"-- producerConfiglocal CONNECT_PARAMS = {
producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000}-- 默认分区local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id is continuous and start from 0 return id % numend--- 我们为了让数据均匀分布到每一个分区内,这里我们使用轮询方式发送消息至Kafka分区中--- 相当于自定义分区的模式,当然你也可以不用这种方式,使用默认的分区也行-- 获取共享内存数据local shared_data = ngx.shared.shared_data-- 设置共享内存的变量(Key)local sharedKey = "shared_Key"local key_val = shared_data:get(sharedKey)if not key_val then key_val = 1 shared_data:set(sharedKey,key_val)end-- 计算消息发送分区local partition_id = ""..tonumber(key_val%PARTITION_NUM)--每个key的value要自增shared_data:incr(sharedKey,1)-- 变量监控local isGone = true-- 进行阈值判断if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESH) then isGone =falseend-- 满足条件true执行,反之不执行if isGone then -- 获取页面数据信息 local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封装数据 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; -- 引入生产者模块创建实例 local producerDic = require "resty.kafka.producer" -- 创建实例 local producer = producerDic:new(BROKER_LIST,CONNECT_PARAMS) -- 调用发送方法send local ok,err = producer:send(TOPIC,partition_id,message) -- 判断发送消息是否成功打印日志 if not ok then ngx.log("kafka send message err:",err) endend

之后打开消费者

kafka-console-consumer.sh \--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \--topic csdn

刷新nginx监听的网页,在消费者端就可以收到内容

tt

11/Dec/2020:19:53:58 +0800#CS#GET / HTTP/1.1#CS#GET#CS##CS##CS##CS#192.168.xx.1#CS#Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.57#CS#2020-12-11T19:53:58+08:00#CS#192.168.xx.101#CS#11/Dec/2020:19:54:01 +0800#CS#GET /

注意事项

kafka server.properties 需开启如下选项

集群的每台机器都需要打开

advertised.listeners=PLAINTEXT://192.168.xx.103:9092

转载地址:http://kasmz.baihongyu.com/

你可能感兴趣的文章