Getting started with Kapacitor

使用Kapacitor导入(流或批处理)时间序列数据,然后对数据进行转换,分析和操作. 要开始使用Kapacitor,请使用Telegraf在本地计算机上收集系统指标并将其存储在InfluxDB中. 然后,使用Kapacitor处理系统数据.

Overview

Kapacitor任务使用TICKscript语法定义要对一组数据进行的工作 . Kapacitor的任务包括:

  • stream任务. 流任务复制在Kapacitor中写入InfluxDB的数据. 减轻了查询开销,并要求Kapacitor将数据存储在磁盘上.
  • batch任务. 批处理任务以指定的时间间隔查询和处理数据.

首先,请执行以下操作:

  1. 如果尚未下载并安装InfluxData TICK堆栈(OSS) .
  2. 启动InfluxDB并启动Telegraf . 默认情况下,Telegraf开始将系统指标发送到InfluxDB并创建一个" telegraf"数据库.
  3. 启动电容器.

注意:以下过程中的示例命令是为Linux编写的.

Start InfluxDB and collect Telegraf data

  1. 通过运行以下命令来启动InfluxDB:

    $ sudo systemctl start influxdb
  2. 在Telegraf配置文件( /etc/telegraf/telegraf.conf )中,配置[[outputs.influxd]]以指定如何连接到InfluxDB和目标数据库.

    [[outputs.influxdb]]
    ## InfluxDB url is required and must be in the following form: http/udp "://" host [ ":" port]
    ## Multiple urls can be specified as part of the same cluster; only ONE url is written to each interval.
    ## InfluxDB url
    urls = ["http://localhost:8086"]
    
    ## The target database for metrics is required (Telegraf creates if one doesn't exist).
    database = "telegraf"
  3. 运行以下命令以启动Telegraf:

    $ sudo systemctl start telegraf
    

    InfluxDB和Telegraf现在在本地主机上运行.

  4. 一分钟后,运行以下命令以使用InfluxDB API查询Telegraf数据:

    $ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SELECT mean(usage_idle) FROM cpu'

    出现类似于以下结果:

    {"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",91.82304336748372]]}]}]}
    

Start Kapacitor

  1. 运行以下命令以生成Kapacitor配置文件:

    kapacitord config > kapacitor.conf

    默认情况下,Kapacitor配置文件保存在/etc/kapacitor/kapacitor.conf . 如果将文件保存到其他位置,请在启动Kapacitor时指定位置.

    Kapacitor配置是一个TOML文件. 为InfluxDB配置的输入也适用于Kapacitor.

  2. 启动Kapacitor服务:

    $ sudo systemctl start kapacitor

    由于InfluxDB在http://localhost:8086 ,因此Kapacitor在启动过程中会找到它,并在InfluxDB上创建多个订阅 . 订阅告诉InfluxDB将数据发送到Kapacitor.

  3. (可选)要查看日志数据,请运行以下命令:

    $ sudo tail -f -n 128 /var/log/kapacitor/kapacitor.log
    
    

    Kapacitor侦听HTTP端口并将数据发布到InfluxDB. 现在,InfluxDB将数据从Telegraf传输到Kapacitor.

Execute a task

  • 在TICKscript的开头,指定包含数据的数据库和保留策略:

    dbrp "telegraf"."autogen"
    
    // ...
    

    当Kapacitor从数据库和保留策略中接收到与指定数据相匹配的数据时,Kapacitor将执行TICKscript.

    Kapacitor支持根据数据库和保留策略执行任务(无其他条件).

Trigger alerts from stream data

触发警报是Kapacitor的常见用例. 必须定义要警告的数据库和保留策略.

Example alert on CPU usage
  1. 将以下TICKscript复制到名为cpu_alert.tick的文件中:

    dbrp "telegraf"."autogen"
    
    stream
        // Select the CPU measurement from the `telegraf` database.
        |from()
            .measurement('cpu')
        // Triggers a critical alert when the CPU idle usage drops below 70%
        |alert()
            .crit(lambda: int("usage_idle") <  70)
            // Write each alert to a file.
            .log('/tmp/alerts.log')
    
  2. 在命令行中,使用kapacitor CLI通过cpu_alert.tick TICKscript定义任务:

    kapacitor define cpu_alert -tick cpu_alert.tick

    如果TICKscript中未包含数据库和保留策略(例如dbrp "telegraf"."autogen" ),请使用带有-dbrp标志的kapacitor define命令,后跟-dbrp ". "以在添加任务时指定它们.

  3. (可选)使用list命令验证警报已创建:

    $ kapacitor list tasks
    ID        Type      Status    Executing Databases and Retention Policies
    cpu_alert stream    disabled  false     ["telegraf"."autogen"]
    
  4. (可选)使用show命令查看有关任务的详细信息:

    $ kapacitor show cpu_alert
    ID: cpu_alert
    Error:
    Template:
    Type: stream
    Status: disabled
    Executing: false
    ...
    
  5. 为确保日志文件和通信通道中没有垃圾邮件,请测试任务 .

  6. 启用任务以开始处理实时数据流:

    kapacitor enable cpu_alert

    警报将实时写入日志.

  7. 运行show命令以验证任务正在接收数据并按预期方式运行:

    $ kapacitor show cpu_alert
    |from()
    // Information about the state of the task and any error it may have encountered.
    ID: cpu_alert
    Error:
    Type: stream
    Status: Enabled
    Executing: true
    Created: 04 May 16 21:01 MDT
    Modified: 04 May 16 21:04 MDT
    LastEnabled: 04 May 16 21:03 MDT
    Databases Retention Policies: [""."autogen"]
    
    // Displays the version of the TICKscript that Kapacitor has stored in its local database.
    TICKscript:
    stream
        // Select just the cpu me
            .measurement('cpu')
        |alert()
            .crit(lambda: "usage_idle" <  70)
            // Whenever we get an alert write it to a file.
            .log('/tmp/alerts.log')
    
    DOT:
    digraph asdf {
    graph [throughput="0.00 points/s"];
    
    stream0 [avg_exec_time_ns="0" ];
    stream0 -> from1 [processed="12"];
    
    from1 [avg_exec_time_ns="0" ];
    from1 -> alert2 [processed="12"];
    
    alert2 [alerts_triggered="0" avg_exec_time_ns="0" ];
    }

返回一个graphviz点格式的树,该树显示由TICKscript和键值关联数组条目定义的数据处理管道,其中包含有关每个节点的统计信息,并且沿着边缘到下一个节点的链接也包括关联数组统计信息. 链接/边缘成员中的已处理键指示沿图的指定边缘传递的数据点的数量.

在上面的例子中, stream0节点(又名stream从TICKscript VAR)已经发送了12分至from1节点. 在from1节点也到发送12分alert2节点. 由于Telegraf被配置为发送cpu数据,因此所有12个点都匹配from1节点的数据库/测量标准并继续传递.

> If necessary, install graphviz on Debian or RedHat using the package provided by the OS provider. The packages offered on the graphviz site are not up-to-date.

Now that the task is running with live data, here is a quick hack to use 100% of one core to generate some artificial cpu activity:

```bash
while true; do i=0; done
```
Test the task

完成以下步骤,以确保日志文件和通信通道不会被警报淹没.

  1. 记录数据流:

    kapacitor record stream -task cpu_alert -duration 60s

    如果出现连接错误,例如: getsockopt: connection refused (Linux)或connectex: No connection could be made... (Windows),请验证Kapacitor服务正在运行(请参阅安装和启动Kapacitor ). 如果Kapacitor正在运行,请检查主机的防火墙设置,并确保可访问端口9092 . 另外,检查/var/log/kapacitor/kapacitor.log消息. 如果/etc/kapacitor/kapacitor.confhttp或其他配置有问题,则该问题会出现在日志中. 如果Kapacitor服务在另一台主机上运行,​​请将本地外壳程序中的KAPACITOR_URL环境变量设置为远程计算机上的Kapacitor端点.

  2. 检索返回的ID并将ID分配给bash变量以供以后使用(返回的实际UUID不同):

    rid=cd158f21-02e6-405c-8527-261ae6f26153
  3. 通过运行以下命令来确认记录捕获了一些数据:

    kapacitor list recordings $rid

    输出应如下所示:

    ID                                      Type    Status    Size      Date
    cd158f21-02e6-405c-8527-261ae6f26153    stream  finished  2.2 kB    04 May 16 11:44 MDT
    

    如果大小大于几个字节,则已捕获数据. 如果Kapacitor不接收数据,请检查每一层:Telegraf→InfluxDB→Kapacitor. 如果Telegraf无法与InfluxDB通信,则会记录错误. 如果InfluxDB无法将数据发送到Kapacitor,则会记录有关connection refused的错误. 针对InfluxDB运行查询SHOW SUBSCRIPTIONS ,以查找InfluxDB用于将数据发送到Kapacitor的端点.

    在以下示例中,InfluxDB必须在localhost:8086上运行:

    $ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SHOW SUBSCRIPTIONS'
    
    {"results":[{"statement_id":0,"series":[{"name":"_internal","columns":["retention_policy","name","mode","destinations"],"values":[["monitor","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]},{"name":"telegraf","columns":["retention_policy","name","mode","destinations"],"values":[["autogen","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]}]}]}
    
  4. 使用replay来测试特定任务的记录数据:

    kapacitor replay -recording $rid -task cpu_alert

    使用-real-clock标志可以通过时间戳之间的增量来设置重播时间. 时间是通过每个节点接收到的数据点来衡量的.

  5. 查看日志中的警报:

    sudo cat /tmp/alerts.log

    Each JSON line represents one alert, and includes the alert level and data that triggered the alert.

    如果主机繁忙,则可能需要一段时间才能记录警报.

  6. (可选)将任务修改为非常敏感,以确保警报正常运行. 在TICKscript中,将lamda函数.crit(lambda: "usage_idle" < 70)更改为.crit(lambda: "usage_idle" < 100) ,然后仅使用TASK_NAME-tick参数运行define命令:

    kapacitor define cpu_alert -tick cpu_alert.tick

    记录期间收到的每个数据点都会触发警报.

  7. 重播修改后的任务以验证结果.

    kapacitor replay -recording $rid -task cpu_alert

    一旦alerts.log结果验证了该任务正在运行,请将usage_idle阈值更改回一个更合理的级别,并使用define命令再次重新定义任务,如步骤6所示.

Gotcha - single versus double quotes

TICKscripts中的单引号和双引号有非常不同的作用:

请注意以下示例:

var data = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('cpu')
        // NOTE: Double quotes on server1
        .where(lambda: "host" == "server1")

该搜索的结果将始终为空,因为在" server1"周围使用了双引号. 这意味着Kapacitor将搜索"主机"字段等于"服务器1" 字段中保存的值的序列. 这可能不是预期的. 更有可能是要搜索标签" host"具有 " server1"的系列,因此应使用单引号. 双引号表示数据字段,单引号表示字符串值. 为了匹配value ,上面的tick脚本应如下所示:

var data = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('cpu')
        // NOTE: Single quotes on server1
        .where(lambda: "host" == 'server1')

Extending TICKscripts

下面的TICKscript将计算运行平均值并与之比较当前值. 然后,如果值与平均值之间的差超过3个标准偏差,它将触发警报. 将cpu_alert.tick脚本替换为下面的TICKscript:

stream
    |from()
        .measurement('cpu')
    |alert()
        // Compare values to running mean and standard deviation
        .crit(lambda: sigma("usage_idle") > 3)
        .log('/tmp/alerts.log')

就像这样,可以创建一个动态阈值,并且,如果cpu的使用量在白天下降或在夜间高峰,则将发出警报. 试试看. 使用define更新任务TICKscript.

kapacitor define cpu_alert -tick cpu_alert.tick

注意:如果已启用任务,则使用define命令重新define任务会自动重新加载( reload )任务. 要定义任务而不重载,请使用-no-reload

现在尾随警报日志:

sudo tail -f /tmp/alerts.log

暂时不应有任何警报触发. 接下来,启动一个while循环以添加一些负载:

while true; do i=0; done

一旦创建了足够的人工负载,便应立即将警报触发器写入日志. 让循环运行几分钟. 取消循环后,应发出另一个警报,指示CPU使用率再次更改. 使用此技术,可以针对CPU使用率的上升沿和下降沿以及任何异常值生成警报.

A real world example

现在已经涵盖了基础知识,这是一个更真实的示例. 一旦来自多个主机的指标流式传输到Kapacitor,就可以执行以下操作:汇总并分组每个数据中心中运行的每个服务的CPU使用率,然后基于第95个百分位触发警报. 除了仅将警报写入日志外,Kapacitor还可以与第三方实用程序集成:目前支持Slack,PagerDuty,HipChat,VictorOps等. 该警报还可以通过电子邮件发送,发布到自定义端点或可以触发自定义脚本的执行. 还可以定义自定义消息格式,以使警报具有正确的上下文和含义. TICKscript看起来像下面的例子.

示例-多个服务cpus上的流的TICKscript以及第95个百分位数的警报

stream
    |from()
        .measurement('cpu')
    // create a new field called 'used' which inverts the idle cpu.
    |eval(lambda: 100.0 - "usage_idle")
        .as('used')
    |groupBy('service', 'datacenter')
    |window()
        .period(1m)
        .every(1m)
    // calculate the 95th percentile of the used cpu.
    |percentile('used', 95.0)
    |eval(lambda: sigma("percentile"))
        .as('sigma')
        .keep('percentile', 'sigma')
    |alert()
        .id('{{ .Name }}/{{ index .Tags "service" }}/{{ index .Tags "datacenter"}}')
        .message('{{ .ID }} is {{ .Level }} cpu-95th:{{ index .Fields "percentile" }}')
        // Compare values to running mean and standard deviation
        .warn(lambda: "sigma" > 2.5)
        .crit(lambda: "sigma" > 3.0)
        .log('/tmp/alerts.log')

        // Post data to custom endpoint
        .post('https://alerthandler.example.com')

        // Execute custom alert handler script
        .exec('/bin/custom_alert_handler.sh')

        // Send alerts to slack
        .slack()
        .channel('#alerts')

        // Sends alerts to PagerDuty
        .pagerDuty()

        // Send alerts to VictorOps
        .victorOps()
        .routingKey('team_rocket')

定义警报之类的简单操作可以快速扩展到更大的范围. 使用上述脚本,如果任何数据中心中的任何服务偏离正常行为的3个以上标准偏差(由CPU使用历史的第95个百分位数定义),就会触发警报,警报将在1分钟内发出!

有关警报如何工作的更多信息,请参阅AlertNode文档.

Trigger alerts from batch data

除了处理流中的数据外,Kapacitor还可以定期查询InfluxDB并批量处理数据.

虽然根据cpu的使用量触发警报更适合流情况,但此处通过遵循相同的用例展示了batch任务如何工作的基本思想.

Example alert on batch data

此TICKscript的功能与早期的流任务大致相同,但作为批处理任务:

dbrp "telegraf"."autogen"

batch
    |query('''
        SELECT mean(usage_idle)
        FROM "telegraf"."autogen"."cpu"
    ''')
        .period(5m)
        .every(5m)
        .groupBy(time(1m), 'cpu')
    |alert()
        .crit(lambda: "mean" < 70)
        .log('/tmp/batch_alerts.log')
  1. 将上面的脚本复制到文件batch_cpu_alert.tick .

  2. 定义任务:

    kapacitor define batch_cpu_alert -tick batch_cpu_alert.tick
  3. 验证其创建:

    $ kapacitor list tasks
    ID              Type      Status    Executing Databases and Retention Policies
    batch_cpu_alert batch     disabled  false     ["telegraf"."autogen"]
    cpu_alert       stream    enabled   true      ["telegraf"."autogen"]
    1. 在任务中记录查询结果(注意,实际的UUID不同):
    kapacitor record batch -task batch_cpu_alert -past 20m
    # Save the id again
    rid=b82d4034-7d5c-4d59-a252-16604f902832

    这将使用batch_cpu_alert任务中的查询记录最后20分钟的批处理. 在这种情况下,由于period为5分钟,因此将记录并保存最后4个批次.

  4. 以相同的方式重播批记录:

    kapacitor replay -recording $rid -task batch_cpu_alert
  5. 检查警报日志以确保警报按预期生成. 上面基于sigma的警报也可以适用于批处理数据. 在Kapacitor中试玩并熟悉更新,测试和运行任务.

Load tasks with Kapacitor

要在指定的负载目录中加载与Kapacitor任务,拯救TICKscript kapacitor.conf . TICKscripts必须包含数据库和保留策略声明dbrp .

当Kapacitor启动时,将自动加载load目录中的TICKscript,无需使用kapacitor define命令添加.

有关更多信息,请参见加载目录 .

本文档是开源的 . 看到错字了吗? 请打开一个问题 .


需要启动和运行方面的帮助吗? Get Support

by  ICOPY.SITE