博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
asp.net core microservices 架构之分布式自动计算(三)-kafka日志同步至elasticsearch和kibana展示...
阅读量:5275 次
发布时间:2019-06-14

本文共 18105 字,大约阅读时间需要 60 分钟。

 

一 kafka consumer准备                       

  前面的章节进行了分布式job的自动计算的概念讲解以及实践。上次分布式日志说过日志写进kafka,是需要进行处理,以便合理的进行展示,分布式日志的量和我们对日志的重视程度,决定了我们必须要有一个大数据检索,和友好展示的需求。那么自然就是elasticsearch和kibana,elasticsearch是可以检索TB级别数据的一个分布式NOSQL数据库,而kibana,不仅仅可以展示详情,而且有针对不同展示需求的功能,并且定制了很多很多日志格式的模板和采集数据的插件,这里不多介绍了,我自己感觉是比percona的pmm强大很多。

  书归正传,我们这一节是要做同步前的准备工作。第一,对kafka的consumer进行封装。第二,读取kafka数据是需要一个后台程序去处理,但是不需要job,我们上次做的框架是基于zookeeper的分布式job,而kafka的分布式是在服务器端的,当然将job分布式设计方案用在轮询或者阻塞方式的后台程序,也是可以的,但是这次就不讲解了。下面我们就将kafka分布式的原理分析下,kafka的客户端有一个组的概念,borker端有一个topic的概念,product在发送消息的时候,会有一个key值。因为kafka存数据就是以key-value的方式存储数据的,所以broker就是用product传递过来的这个key进行运算,合理的将数据存储到某个topic的某个分区。而consumer端订阅topic,可以订阅多个topic,它的分派是这样的,每一个topic下的分区会有多个consuer,但是这些consumer必须属于不同的组,而每一个consumer可以订阅多个topic下的分区,但是不能重复。下面看图吧,以我们这次实际的日志为例,在kafka中mylog topic有5个分区。

  那么如果我们有三个程序需要用这个mylog topic怎么办?而且我们需要很快的处理完这个数据,所以有可能这三个程序每一个程序都要两台服务器。想着都很头大,对吧?当然如果有我们前面讲解的分布式job也可以处理,但是要把分布式的功能迁移到这个后台程序,避免不了又大动干戈,开发,调试,测试,修改bug,直到程序稳定,那又是一场苦功。但是在kafka这里,不用担心,三个程序,比如订单,库存,顾客,我们为这三个程序的kafka客户端对应的设置为三个组,每一个组中consumer数量只要不超过5个,假如订单需要用到名为mylog的topic的消息,只要订单处理这个topic的实例数量,必须不能超过5个,当然可以少于5个,也可以等于0个。而同时一个consumer又可以去订阅多个topic,这也是kafka可以媲美rabbit的重要的一个原因,先天支持并发和扩展。我们看图:

 

如果一个组的consumer数量没有topic的分区多,kafka会自动分派给这个组的consumer,如果某一个consumer失败,kafka也会自动的将这个consumer的offset记录并且分派给另外一个consumer。

但是要注意一点,kafka的topic中的每个分区是线性的,但是所有的分区看起来就不会是线性的,如果需要topic是线性的,就必须将分区设置为1个。

下面看看我们封装的kafka客户端方法:

using System;using System.Collections.Generic;using System.Threading.Tasks;using Confluent.Kafka;using Microsoft.Extensions.Options;namespace  Walt.Framework.Service.Kafka{    public class KafkaService : IKafkaService    {        private KafkaOptions _kafkaOptions;        private Producer _producer;        private Consumer _consumer;        public Action
GetMessageDele{ get; set; } public Action
ErrorDele{ get; set; } public Action
LogDele{ get; set; } public KafkaService(IOptionsMonitor
kafkaOptions) { _kafkaOptions=kafkaOptions.CurrentValue; kafkaOptions.OnChange((kafkaOpt,s)=>{ _kafkaOptions=kafkaOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s); }); _producer=new Producer(_kafkaOptions.Properties); _consumer=new Consumer(_kafkaOptions.Properties); } private byte[] ConvertToByte(string str) { return System.Text.Encoding.Default.GetBytes(str); } public async Task
Producer
(string topic,string key,T t) { if(string.IsNullOrEmpty(topic) || t==null) { throw new ArgumentNullException("topic或者value不能为null."); } string data = Newtonsoft.Json.JsonConvert.SerializeObject(t); var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(data)); return task; } public void AddProductEvent() { _producer.OnError+=new EventHandler
(Error); _producer.OnLog+=new EventHandler
(Log); }      ///以事件的方式获取message public void AddConsumerEvent(IEnumerable
topics) { _consumer.Subscribe(topics); _consumer.OnMessage += new EventHandler
(GetMessage); _consumer.OnError += new EventHandler
(Error); _consumer.OnLog += new EventHandler
(Log); } private void GetMessage(object sender, Message mess) { if(GetMessageDele!=null) { GetMessageDele(mess); } } private void Error(object sender, Error mess) { if(ErrorDele!=null) { ErrorDele(mess); } } private void Log(object sender, LogMessage mess) { if(LogDele!=null) { LogDele(mess); } }     //以轮询的方式获取message public Message Poll(int timeoutMilliseconds) { Message message =default(Message); _consumer.Consume(out message, timeoutMilliseconds); return message; } }}

以事件激发的方式,因为是线程安全的方式调用,而本实例是后台方式执行,少不了多线程,所以还是以轮询的方式。以轮询的方式,这样的程序需要放那块尼?就是我们的后台程序框架。

 

二 后台程序管理框架开发                                                    

 

他的原理和job几乎差不多,比job要简单多了。看入口程序:

using System;using System.Collections.Generic;using System.Collections.ObjectModel;using System.IO;using System.Linq;using System.Reflection;using System.Threading.Tasks;using Microsoft.AspNetCore;using Microsoft.AspNetCore.Hosting;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using EnvironmentName = Microsoft.Extensions.Hosting.EnvironmentName;using Walt.Framework.Log;using Walt.Framework.Service;using Walt.Framework.Service.Kafka;using Walt.Framework.Configuration;using MySql.Data.EntityFrameworkCore;using Microsoft.EntityFrameworkCore;using System.Threading;using IApplicationLife =Microsoft.Extensions.Hosting;using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime;namespace Walt.Framework.Console{    public class Program    {        public static void Main(string[] args)        {            //这里获取程序及和工作线程配置信息            Dictionary
assmblyColl = new Dictionary
(); var host = new HostBuilder() .UseEnvironment(EnvironmentName.Development) .ConfigureAppConfiguration((hostContext, configApp) => { //这里netcore支持多数据源,所以可以扩展到数据库或者redis,集中进行配置。 // configApp.SetBasePath(Directory.GetCurrentDirectory()); configApp.AddJsonFile( $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true); configApp.AddEnvironmentVariables("PREFIX_"); configApp.AddCommandLine(args); }).ConfigureLogging((hostContext, configBuild) => { configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging")); configBuild.AddConsole(); configBuild.AddCustomizationLogger(); }) .ConfigureServices((hostContext, service) => { service.Configure
(option => { option.ShutdownTimeout = System.TimeSpan.FromSeconds(10); }); service.AddKafka(KafkaBuilder => { KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService")); }); service.AddElasticsearchClient(config=>{ config.AddConfiguration(hostContext.Configuration.GetSection("ElasticsearchService")); }); service.AddDbContext
(option => option.UseMySQL(hostContext.Configuration.GetConnectionString("ConsoleDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient); ///TODO 待实现从数据库中pull数据,再将任务添加进DI service.AddSingleton
(); }) .Build(); CancellationTokenSource source = new CancellationTokenSource(); CancellationToken token = source.Token; var task=Task.Run(async () =>{ IConsole console = host.Services.GetService
(); await console.AsyncExcute(source.Token); },source.Token); Dictionary
dictTask = new Dictionary
(); dictTask.Add("kafkatoelasticsearch", task); int recordRunCount = 0; var fact = host.Services.GetService
(); var log = fact.CreateLogger
(); var disp = Task.Run(() => { while (true) { if (!token.IsCancellationRequested) { ++recordRunCount; foreach (KeyValuePair
item in dictTask) { if (item.Value.IsCanceled || item.Value.IsCompleted || item.Value.IsCompletedSuccessfully || item.Value.IsFaulted) { log.LogWarning("console任务:{0},参数:{1},执行异常,task状态:{2}", item.Key, "", item.Value.Status); if (item.Value.Exception != null) { log.LogError(item.Value.Exception, "task:{0},参数:{1},执行错误.", item.Key, ""); //TODO 根据参数更新数据库状态,以便被监控到。 } //更新数据库状态。 } } } System.Threading.Thread.Sleep(2000); log.LogInformation("循环:{0}次,接下来等待2秒。", recordRunCount); } },source.Token); IApplicationLifetime appLiftTime = host.Services.GetService
(); appLiftTime.ApplicationStopping.Register(()=>{ log.LogInformation("程序停止中。"); source.Cancel(); log.LogInformation("程序停止完成。"); }); host.RunAsync().GetAwaiter().GetResult(); } }}

因为分布式job有quartz,是有自己的设计理念,但是这个console后台框架不需要,是自己开发,所以完全和Host通用主机兼容,所有的部件都可以DI。设计原理就是以数据库的配置,构造Task,然后使用

CancellationTokenSource和TaskCompletionSource去管理Task。运行结果根据状态去更新数据库,以便监控。当然咱们这个例子功能没实现全,后面可以完善 ,感兴趣的可以去我的github上pull代码。咱们看任务中的例子代码:
using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;using Confluent.Kafka;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.Logging;using Nest;using Walt.Framework.Log;using Walt.Framework.Service.Elasticsearch;using Walt.Framework.Service.Kafka;namespace Walt.Framework.Console{    public class KafkaToElasticsearch : IConsole    {        ILoggerFactory _logFact;        IConfiguration _config;        IElasticsearchService _elasticsearch;        IKafkaService _kafkaService;        public KafkaToElasticsearch(ILoggerFactory logFact,IConfiguration config        ,IElasticsearchService elasticsearch        ,IKafkaService kafkaService)        {            _logFact = logFact;            _config = config;            _elasticsearch = elasticsearch;            _kafkaService = kafkaService;        }        public async Task AsyncExcute(CancellationToken cancel=default(CancellationToken))        {            var log = _logFact.CreateLogger
(); _kafkaService.AddConsumerEvent(new List
(){
"mylog"});         //以事件方式获取message不工作,因为跨线程 // _kafkaService.GetMessageDele = (message) => { // var id = message.Key; // var offset = string.Format("{0}---{2}",message.Offset.IsSpecial,message.Offset.Value); // var topic = message.Topic; // var topicPartition = message.TopicPartition.Partition.ToString(); // var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; // // log.LogInformation("id:{0},offset:{1},topic:{2},topicpatiton:{3},topicPartitionOffsetValue:{4}" // // ,id,offset,topic,topicPartition,topicPartitionOffsetValue); // }; // _kafkaService.ErrorDele = (message) => { // log.LogError(message.ToString()); // }; // _kafkaService.LogDele = (message) => { // log.LogInformation(message.ToString()); // }; // log.LogInformation("事件添加完毕"); // var waitForStop = // new TaskCompletionSource
(TaskCreationOptions.RunContinuationsAsynchronously); // cancel.Register(()=>{ // log.LogInformation("task执行被取消回掉函数"); // waitForStop.SetResult(null); // }); // waitForStop.Task.Wait(); // log.LogInformation("任务已经被取消。");         //下面以轮询方式。 if(!cancel.IsCancellationRequested) { while (true) { Message message = _kafkaService.Poll(2000); if (message != null) { if(message.Error!=null&&message.Error.Code!=ErrorCode.NoError) { //log.LogError("consumer获取message出错,详细信息:{0}",message.Error); System.Console.WriteLine("consumer获取message出错,详细信息:{0}",message.Error); System.Threading.Thread.Sleep(200); continue; } var id =message.Key==null?"":System.Text.Encoding.Default.GetString(message.Key); var offset = string.Format("{0}---{1}", message.Offset.IsSpecial, message.Offset.Value); var topic = message.Topic; var topicPartition = message.TopicPartition.Partition.ToString(); var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; var val =System.Text.Encoding.Default.GetString( message.Value); EntityMessages entityMess = Newtonsoft.Json.JsonConvert.DeserializeObject
(val); await _elasticsearch.CreateIndexIfNoExists
("mylog"+entityMess.OtherFlag); // _elasticsearch.CreateMappingIfNoExists
("mylog"+entityMess.OtherFlag // ,"mylog"+entityMess.OtherFlag+"type",null);               //为elasticsearch添加document var addDocumentResponse = await _elasticsearch.CreateDocument
("mylog" + entityMess.OtherFlag , new LogElasticsearch() { Id = entityMess.Id, Time = entityMess.DateTime, LogLevel = entityMess.LogLevel, Exception = entityMess.Message } ); if (addDocumentResponse != null) { if (!addDocumentResponse.ApiCall.Success) { } } } } } return ; } }}

 

 

 三 elasticsearch 服务开发                                         

  服务已经开发很多了,主要就是构建和配置的设计,还有就是对组件的封装,看程序结构:

配置:

{  "Logging": {    "LogLevel": {      "Default": "Information",      "System": "Information",      "Microsoft": "Information"    },    "KafkaLog":{      "Prix":"console", //目前这个属性,可以放程序类别,比如用户中心,商品等。      "LogStoreTopic":"mylog"    }  },  "KafkaService":{    "Properties":{      "bootstrap.servers":"192.168.249.106:9092",      "group.id":"group2"    }  },  "ConnectionStrings": {    "ConsoleDatabase":"Server=192.168.249.106;Database=quartz;Uid=quartz;Pwd=quartz"  },  "ElasticsearchService":{    "Host":["http://192.168.249.105:9200","http://localhost:9200"],    "TimeOut":"10000",    "User":"",    "Pass":""  }}

服务类:这里有必要说下,elasticsearch是基于api的接口,最底层就是http请求,在接口上,实现了一个高级的接口和一个低级别的接口,当然低级别的接口需要熟悉elasticsearch的协议,

而高级别的api,使用强类型去使用,对开发很有帮助。下面是封装elasticsearch的服务类:

using System;using System.Net.Http;using Elasticsearch.Net;using Microsoft.Extensions.Options;using System.Threading.Tasks;using Microsoft.Extensions.Logging;using Nest;namespace Walt.Framework.Service.Elasticsearch{    public class ElasticsearchService:IElasticsearchService    {        private  ElasticsearchOptions _elasticsearchOptions=null;        private ElasticClient _elasticClient = null;        private ILoggerFactory _loggerFac;        public ElasticsearchService(IOptionsMonitor
options ,ILoggerFactory loggerFac) { _elasticsearchOptions = options.CurrentValue; options.OnChange((elasticsearchOpt,s)=>{ _elasticsearchOptions=elasticsearchOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(elasticsearchOpt)+"---"+s); });       //连接客户端需,支持多个节点,防止单点故障 var lowlevelClient = new ElasticLowLevelClient(); var urlColl = new Uri[_elasticsearchOptions.Host.Length]; for (int i = 0; i < _elasticsearchOptions.Host.Length;i++) { urlColl[i] = new Uri(_elasticsearchOptions.Host[i]); } _loggerFac = loggerFac; var connectionPool = new SniffingConnectionPool(urlColl); var settings = new ConnectionSettings(connectionPool) .RequestTimeout(TimeSpan.FromMinutes(_elasticsearchOptions.TimeOut)) .DefaultIndex("mylogjob");//设置默认的index _elasticClient = new ElasticClient(settings); }      //如果index存在,则返回,如果不存在,则创建,type的创建方式是为文档类型打标签ElasticsearchTypeAttribute
public async Task
CreateIndexIfNoExists
(string indexName) where T : class { var log = _loggerFac.CreateLogger
(); var exists = await _elasticClient.IndexExistsAsync(Indices.Index(indexName)); if (exists.Exists) { log.LogWarning("index:{0}已经存在", indexName.ToString()); return await Task.FromResult(true); } var response = await _elasticClient.CreateIndexAsync(indexName ,c=>c.Mappings(mm=>mm.Map
(m=>m.AutoMap())));//将类型的属性自动映射到index的type上,也可以打标签控制那个可以映射,那些不可以 log.LogInformation(response.DebugInformation); if (response.Acknowledged) { log.LogInformation("index:{0},创建成功", indexName.ToString()); return await Task.FromResult(false); } else { log.LogError(response.ServerError.ToString()); log.LogError(response.OriginalException.ToString()); return await Task.FromResult(false); } }     //创建document public async Task
CreateDocument
(string indexName,T t) where T:class { var log=_loggerFac.CreateLogger
(); if(t==null) { log.LogError("bulk 参数不能为空。"); return null; } IndexRequest
request = new IndexRequest
(indexName, TypeName.From
()) { Document = t }; var createResponse = await _elasticClient.CreateDocumentAsync
(t); log.LogInformation(createResponse.DebugInformation); if (createResponse.ApiCall.Success) { log.LogInformation("index:{0},type:{1},创建成功", createResponse.Index, createResponse.Type); return createResponse; } else { log.LogError(createResponse.ServerError.ToString()); log.LogError(createResponse.OriginalException.ToString()); return null; } } }}

poco类型,这个类会和index的typ相关联的:

using System;using Nest;namespace Walt.Framework.Console{    [ElasticsearchTypeAttribute(Name="LogElasticsearchDefaultType")] //可以使用类型生成和查找type    public class LogElasticsearch    {        public string Id { get; set; }        public DateTime Time { get; set; }        public string LogLevel{ get; set; }        public string Exception{ get; set; }        public string Mess{ get; set; }    }}

然后就是执行我们console后台程序,就可以在kibana看到日志被同步的情况:

 所有程序都提交到github,如果调试代码,再看这篇文章,或许理解能更快。

                         

转载于:https://www.cnblogs.com/ck0074451665/p/10340387.html

你可能感兴趣的文章
Dreamweaver cc新版本css单行显示
查看>>
【hdu 1429】胜利大逃亡(续)
查看>>
Factory Design Pattern
查看>>
P1192-台阶问题
查看>>
Java大数——a^b + b^a
查看>>
简单的数据库操作
查看>>
帧的最小长度 CSMA/CD
查看>>
树状数组及其他特别简单的扩展
查看>>
普通求素数和线性筛素数
查看>>
PHP截取中英文混合字符
查看>>
电子眼抓拍大解密
查看>>
51nod1076 (边双连通)
查看>>
Linux pipe函数
查看>>
java equals 小记
查看>>
2019春 软件工程实践 助教总结
查看>>
Zerver是一个C#开发的Nginx+PHP+Mysql+memcached+redis绿色集成开发环境
查看>>
程序的静态链接,动态链接和装载 (补充)
查看>>
关于本博客说明
查看>>
[Kaggle] Sentiment Analysis on Movie Reviews
查看>>
价值观
查看>>