Skip to main content

Consul是用于发现和配置基础结构中的服务的工具. 一个Vert.x客户端,允许应用程序通过阻止和非阻止HTTP API与Consul系统进行交互.

Using Vert.x Consul Client

要使用此项目,请将以下依赖项添加到构建描述符的" dependencies"部分:

  • Maven(在您的pom.xml ):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-consul-client</artifactId>
 <version>3.8.4</version>
</dependency>
  • Gradle(在build.gradle文件中):

compile 'io.vertx:vertx-consul-client:3.8.4'

Creating a client

只需使用工厂方法:

def client = ConsulClient.create(vertx)

客户端也可以配置选项.

def options = [
  host:"consul.example.com"
]

def client = ConsulClient.create(vertx, options)

领事客户端支持以下配置:

host

领事主持人. 默认为localhost

port

领事HTTP API端口. 默认为8500

timeout

设置时间量(以毫秒为单位),在该时间量之后,如果请求在超时时间内未返回任何数据,则失败将传递给处理程序,并且请求将被关闭.

aclToken

ACL令牌. 如果提供,客户端将通过提供"?token"查询参数向领事馆提出请求时使用此令牌. 如果未提供,则使用空令牌,该令牌映射到"匿名" ACL策略.

dc

数据中心名称. 如果提供,客户端将通过提供"?dc"查询参数向领事馆提出请求时使用它. 如果未提供,则查询领事代理的数据中心.

ConsulClient选项从vertx-web-client模块扩展了WebClientOptions,因此可以使用许多设置. 请参阅文档.

Using the API

客户端API由ConsulClient表示. 该API与Consul API文档中介绍的Consul的HTTP API非常相似

Blocking queries

某些端点支持称为"阻止查询"的功能. 阻塞查询用于等待使用长轮询的潜在更改. 任何支持阻塞的端点还提供唯一的标识符(索引),该标识符表示所请求资源的当前状态. 以下配置用于执行阻止查询:

index

指示客户端希望等待该索引之后的任何更改的值.

wait

指定阻止请求的最大持续时间的参数. 仅限10分钟.

def opts = [
  index:lastIndex,
  wait:"1m"
]

重要说明是,阻止请求的返回不能保证更改. 可能已达到超时或存在不影响查询结果的幂等写入.

Key/Value Store

KV端点用于访问Consul的简单键/值存储,可用于存储服务配置或其他元数据. 支持以下端点:

  • 要管理单个密钥的更新,单个密钥或密钥前缀的删除以及单个密钥或密钥前缀的获取

  • 在单个原子事务中管理多个密钥的更新或获取

Get key-value pair from store

领事客户端可以返回某些密钥的值

consulClient.getValue("key", { res ->
  if (res.succeeded()) {
    println("retrieved value: ${res.result().value}")
    println("modify index: ${res.result().modifyIndex}")
  } else {
    res.cause().printStackTrace()
  }
})

…或者它可以返回具有给定前缀的所有键值对

consulClient.getValues("prefix", { res ->
  if (res.succeeded()) {
    println("modify index: ${res.result().index}")
    res.result().list.each { kv ->
      println("retrieved value: ${kv.value}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

返回的键值对象包含以下字段(请参见文档 ):

createIndex

内部索引值,代表创建条目的时间.

modifyIndex

修改此键的最后一个索引

lockIndex

锁中成功获取此密钥的次数

key

钥匙

flags

附加到该条目的标志. 客户可以选择使用它,但是对于他们的应用来说很有意义

value

价值

session

拥有锁的会话

修改索引可用于阻止查询:

def opts = [
  index:modifyIndex,
  wait:"1m"
]

consulClient.getValueWithOptions("key", opts, { res ->
  if (res.succeeded()) {
    println("retrieved value: ${res.result().value}")
    println("new modify index: ${res.result().modifyIndex}")
  } else {
    res.cause().printStackTrace()
  }
})

Put key-value pair to store

consulClient.putValue("key", "value", { res ->
  if (res.succeeded()) {
    def opResult = res.result() ? "success" : "fail"
    println("result of the operation: ${opResult}")
  } else {
    res.cause().printStackTrace()
  }
})

提出要求的认沽要求

def opts = [
  flags:42,
  casIndex:modifyIndex,
  acquireSession:"acquireSessionID",
  releaseSession:"releaseSessionID"
]

consulClient.putValueWithOptions("key", "value", opts, { res ->
  if (res.succeeded()) {
    def opResult = res.result() ? "success" : "fail"
    println("result of the operation: ${opResult}")
  } else {
    res.cause().printStackTrace()
  }
})

可以与PUT请求一起使用的查询选项的列表:

flags

这可用于指定02 64 -1之间的无符号值. 客户可以选择使用此选项,但是对于他们的应用来说很有意义.

casIndex

该标志用于将PUT变成Check-And-Set操作. 这对于构建更复杂的同步原语非常有用. 如果索引为0 ,则Consul仅在密钥不存在时才放置密钥. 如果索引不为零,则仅当索引与该键的ModifyIndex匹配时才设置键.

acquireSession

该标志用于将PUT转换为锁定获取操作. 这很有用,因为它允许在领事的基础上进行领导人选举. 如果未持有锁且会话有效,则除了更新密钥内容外,这还将增加LockIndex并设置密钥的Session值. 不需要存在要获取的密钥. 如果锁已由给定会话持有,则LockIndex不会递增,但密钥内容将更新. 这样,当前的锁持有者就可以更新密钥内容,而不必放弃锁并重新获取它.

releaseSession

此标志用于将PUT转换为锁定释放操作. 与acquireSession配对时,这很有用,因为它允许客户端产生锁. 这将使LockIndex保持不变,但将清除关联的密钥会话. 该会话必须持有该密钥才能解锁.

Transactions

连接到Consul 0.7和更高版本时,客户端可以管理单个原子事务中多个密钥的更新或获取. KV是唯一可用的操作类型,不过在Consul的将来版本中可能会添加其他类型的操作,以与键/值操作混合使用(请参阅文档 ).

def request = [
  operations:[
    [
      key:"key1",
      value:"value1",
      type:"SET"
    ],
    [
      key:"key2",
      value:"value2",
      type:"SET"
    ]
  ]
]

consulClient.transaction(request, { res ->
  if (res.succeeded()) {
    println("succeeded results: ${res.result().results.size()}")
    println("errors: ${res.result().errors.size()}")
  } else {
    res.cause().printStackTrace()
  }
})

Delete key-value pair

最后,Consul客户端允许从存储中删除键值对:

consulClient.deleteValue("key", { res ->
  if (res.succeeded()) {
    println("complete")
  } else {
    res.cause().printStackTrace()
  }
})

…或具有相应键前缀的所有键值对

consulClient.deleteValues("prefix", { res ->
  if (res.succeeded()) {
    println("complete")
  } else {
    res.cause().printStackTrace()
  }
})

Services

服务发现的主要目标之一是提供可用服务的目录. 为此,代理提供了一种简单的服务定义格式,以声明服务的可用性并将其与运行状况检查相关联.

Service registering

A service definition must include a name and may optionally provide an id, tags, address, port, and checks.

def opts = [
  name:"serviceName",
  id:"serviceId",
  tags:["tag1", "tag2"],
  checkOptions:[
    ttl:"10s"
  ],
  address:"10.0.0.1",
  port:8048
]
name

the name of service

id

如果未提供,则将id设置为name . 要求所有服务在每个节点上都具有唯一的ID,因此,如果名称可能冲突,则应提供唯一的ID.

tags

对Consul不透明的值列表,可用于区分主节点或辅助节点,不同版本或任何其他服务级别标签.

address

用于指定特定于服务的IP地址. 默认情况下,使用代理的IP地址,并且不需要提供该IP地址.

port

used as well to make a service-oriented architecture simpler to configure; this way, the address and port of a service can be discovered.

checks

相关健康检查

这些选项用于在目录中注册服务:

consulClient.registerService(opts, { res ->
  if (res.succeeded()) {
    println("Service successfully registered")
  } else {
    res.cause().printStackTrace()
  }

})

Service discovery

领事客户端允许获取提供服务的节点的实际列表

consulClient.catalogServiceNodes("serviceName", { res ->
  if (res.succeeded()) {
    println("found ${res.result().list.size()} services")
    println("consul state index: ${res.result().index}")
    res.result().list.each { service ->
      println("Service node: ${service.node}")
      println("Service address: ${service.address}")
      println("Service port: ${service.port}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

可以获取具有相关健康检查状态的列表. 可以按检查状态过滤结果.

consulClient.healthServiceNodes("serviceName", passingOnly, { res ->
  if (res.succeeded()) {
    println("found ${res.result().list.size()} services")
    println("consul state index: ${res.result().index}")
    res.result().list.each { entry ->
      println("Service node: ${entry.node}")
      println("Service address: ${entry.service.address}")
      println("Service port: ${entry.service.port}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

服务查询还有其他参数

def queryOpts = [
  tag:"tag1",
  near:"_agent",
  blockingOptions:[
    index:lastIndex
  ]
]
tag

默认情况下,返回与服务匹配的所有节点. 可以使用tag查询参数按标签过滤列表

near

添加带有节点名称的可选near参数将根据该节点的估计往返时间以升序对节点列表进行排序. near = _agent near传递将使用代理的节点进行排序.

blockingOptions

阻止qyery选项

然后请求应该看起来像

consulClient.healthServiceNodesWithOptions("serviceName", passingOnly, queryOpts, { res ->
  if (res.succeeded()) {
    println("found ${res.result().list.size()} services")
  } else {
    res.cause().printStackTrace()
  }

})

Deregister service

可以通过其ID注销服务:

consulClient.deregisterService("serviceId", { res ->
  if (res.succeeded()) {
    println("Service successfully deregistered")
  } else {
    res.cause().printStackTrace()
  }
})

Health Checks

代理的主要作用之一是管理系统级和应用程序级运行状况检查. 如果运行状况检查与服务相关联,则将其视为应用程序级. 如果未与服务关联,则检查将监视整个节点的运行状况.

def opts = [
  tcp:"localhost:4848",
  interval:"1s"
]

Consul客户端支持的检查选项列表为:

id

支票编号

name

支票名称

script

检查脚本的本地路径. 另外你应该设置检查间隔

http

要检查的HTTP地址. 另外你应该设置检查间隔

ttl

支票生存时间

tcp

要检查的TCP地址. 另外你应该设置检查间隔

interval

以Go的时间格式检查间隔,该间隔是十进制数字序列,每个序列都有可选的分数和一个单位后缀,例如" 300ms","-1.5h"或" 2h45m". 有效时间单位为" ns"," us"(或" µs")," ms"," s"," m"," h"

notes

检查笔记

serviceId

服务ID,以将已注册的支票与代理提供的现有服务相关联.

deregisterAfter

注销超时. 这是可选字段,它是与"间隔"和" TTL"相同的时间格式的超时. 如果支票与服务相关联,并且具有超过此配置值的临界状态,则其关联的服务(及其所有关联的支票)将自动注销. 最小超时为1分钟,获取关键服务的进程每30秒运行一次,因此触发注销可能需要比配置的超时稍长的时间. 通常,此配置的超时时间要比给定服务的任何预期的可恢复停机时间长得多.

status

检查状态以指定运行状况检查的初始状态

Name字段是必填字段,也是ScriptHTTPTCPTTL . ScriptTCPHTTP也需要设置Interval . 如果未提供ID ,则将其设置为Name . 每个业务代表不能有重复的ID条目,因此可能有必要提供ID.

consulClient.registerCheck(opts, { res ->
  if (res.succeeded()) {
    println("check successfully registered")
  } else {
    res.cause().printStackTrace()
  }
})

Events

领事提供了一种将自定义用户事件触发到整个数据中心的机制. 这些事件对Consul来说是不透明的,但可用于构建脚本基础结构以进行自动部署,重新启动服务或执行任何其他编排操作.

要发送用户事件,仅需要其名称

consulClient.fireEvent("eventName", { res ->
  if (res.succeeded()) {
    println("Event sent")
    println("id: ${res.result().id}")
  } else {
    res.cause().printStackTrace()
  }
})

还可以指定其他选项.

node

正则表达式,用于按节点名称过滤收件人

service

正则表达式以按服务筛选收件人

tag

正则表达式以按标签过滤收件人

payload

事件的可选主体. 正文内容对于领事是不透明的,并成为事件的"有效负载"

def opts = [
  tag:"tag",
  payload:"message"
]

consulClient.fireEventWithOptions("eventName", opts, { res ->
  if (res.succeeded()) {
    println("Event sent")
    println("id: ${res.result().id}")
  } else {
    res.cause().printStackTrace()
  }
})

Consul客户端支持查询以获取代理已知的最新事件. 事件是使用八卦协议广播的,因此它们没有全局顺序,也不能保证交付. 代理仅缓冲最近的条目. 当前的缓冲区大小为256,但是此值将来可能会更改.

consulClient.listEvents({ res ->
  if (res.succeeded()) {
    println("Consul index: ${res.result().index}")
    res.result().list.each { event ->
      println("Event id: ${event.id}")
      println("Event name: ${event.name}")
      println("Event payload: ${event.payload}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

领事索引可用于准备阻止请求:

def opts = [
  name:"eventName",
  blockingOptions:[
    index:lastIndex
  ]
]

consulClient.listEventsWithOptions(opts, { res ->
  if (res.succeeded()) {
    println("Consul index: ${res.result().index}")
    res.result().list.each { event ->
      println("Event id: ${event.id}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

Sessions

Consul提供了一种会话机制,可用于构建分布式锁. 会话充当节点,运行状况检查和键/值数据之间的绑定层. 构造会话时,可以提供节点名称,运行状况检查列表,行为,TTL和锁定延迟.

def opts = [
  node:"nodeId",
  behavior:"RELEASE"
]
lockDelay

可以使用" s"后缀指定为持续时间字符串,以秒为单位. 默认值为" 15s".

name

可用于为会话提供易于理解的名称.

node

如果指定,则必须引用已经注册的节点. 默认情况下,使用代理自己的节点名称.

checks

用于提供相关的运行状况检查的列表. 强烈建议您覆盖此列表,并包括默认的serfHealth .

behavior

可以设置为releasedelete . 这控制了会话无效时的行为. 默认情况下,这是release ,导致release所有持有的锁. 更改为delete导致所有持有的锁都被删除. delete对于创建临时键/值条目很有用.

ttl

是一个持续时间字符串,就像LockDelay一样,它可以将s用作后缀数秒钟. 如果指定,则当前必须在10到86400之间. 如果提供该会话,则如果在TTL过期之前未更新会话,则该会话将无效.

有关完整信息,请参阅Consul Sessions内部

新建会话具有一个可用于标识它的命名ID. 该ID可以与KV商店一起使用以获取锁:互斥的咨询机制.

consulClient.createSessionWithOptions(opts, { res ->
  if (res.succeeded()) {
    println("Session successfully created")
    println("id: ${res.result()}")
  } else {
    res.cause().printStackTrace()
  }
})

并摧毁它

consulClient.destroySession(sessionId, { res ->
  if (res.succeeded()) {
    println("Session successfully destroyed")
  } else {
    res.cause().printStackTrace()
  }
})

列出属于节点的会话

consulClient.listNodeSessions("nodeId", { res ->
  if (res.succeeded()) {
    res.result().list.each { session ->
      println("Session id: ${session.id}")
      println("Session node: ${session.node}")
      println("Session create index: ${session.createIndex}")
    }
  } else {
    res.cause().printStackTrace()
  }
})

所有读取会话端点均支持阻止查询和所有一致性模式.

def blockingOpts = [
  index:lastIndex
]

consulClient.listSessionsWithOptions(blockingOpts, { res ->
  if (res.succeeded()) {
    println("Found ${res.result().list.size()} sessions")
  } else {
    res.cause().printStackTrace()
  }
})

Nodes in datacenter

consulClient.catalogNodes({ res ->
  if (res.succeeded()) {
    println("found ${res.result().list.size()} nodes")
    println("consul state index ${res.result().index}")
  } else {
    res.cause().printStackTrace()
  }
})

该端点支持阻止查询并按距指定节点的距离排序

def opts = [
  near:"_agent",
  blockingOptions:[
    index:lastIndex
  ]
]

consulClient.catalogNodesWithOptions(opts, { res ->
  if (res.succeeded()) {
    println("found ${res.result().list.size()} nodes")
  } else {
    res.cause().printStackTrace()
  }
})

Prepared Queries

该端点创建,更新,销毁和执行准备好的查询. 准备好的查询使您可以注册一个复杂的服务查询,然后通过其ID或名称执行该查询,以获得一组提供给定服务的健康节点. 与Consul的DNS接口结合使用时,这特别有用,因为与DNS暴露的入口点有限相比,它允许的查询要丰富得多.

创建准备好的查询有许多参数. 有关详细信息,请参阅文档

dc

指定要查询的数据中心. 这将默认为要查询的代理的数据中心. 这是作为查询参数作为URL的一部分指定的.

name

指定一个可选的友好名称,可用于执行查询而不是使用其ID.

session

指定现有会话的ID. 这提供了一种在给定会话无效时自动删除准备好的查询的方法. 如果未提供,则在不再需要时,必须手动删除准备好的查询.

token

指定每次执行查询时要使用的ACL令牌. 这允许查询由具有较少甚至没有ACL令牌的客户端执行,因此应谨慎使用. 令牌本身只能由具有管理令牌的客户端看到. 如果"令牌"字段留为空白或省略,则将使用客户端的ACL令牌来确定他们是否有权访问所查询的服务. 如果客户端不提供ACL令牌,则将使用匿名令牌.

service

指定要查询的服务的名称. 这是必填字段.

failover

包含两个字段,这两个字段都是可选的,并确定在执行查询时,如果本地数据中心中没有可用的健康节点,则会发生什么情况. 它允许在配置很少的情况下使用其他数据中心中的节点.

nearestN

指定使用WAN闲话池中的网络坐标,根据估计的网络往返时间将查询转发到最近的其他数据中心. 从处理查询的服务器到远程数据中心中的服务器的中位往返时间用于确定优先级.

datacenters

指定远程数据中心的固定列表,以在本地数据中心中没有运行正常的节点时将查询转发到该列表. 按照列表中给出的顺序查询数据中心. 如果将此选项与NearestN结合使用,则将首先执行NearestN查询,然后执行数据中心给出的列表. 即使在一次故障转移期间,给定的数据中心将仅被查询一次,即使它被NearestN选中并在数据中心中列出也是如此.

onlyPassing

指定查询的运行状况检查过滤的行为. 如果将其设置为false,则结果将包括通过检查以及警告状态的节点. 如果将其设置为true,则仅返回通过检查的节点.

tags

指定服务标签列表以过滤查询结果. 为了使服务通过标签过滤器,它必须具有所有必需的标签,并且没有排除的标签(以!前缀).

nodeMeta

指定将用于将查询结果过滤到存在给定元数据值的节点的用户定义键/值对的列表.

dnsTtl

指定通过DNS提供查询结果时的TTL持续时间. 如果指定此选项,它将优先于任何Consul代理特定的配置.

templateType

是查询类型,必须为name_prefix_match . 这意味着该模板将应用于名称与模板的"名称"字段匹配的名称的任何查询查询. 在此示例中,任何针对geo-db的查询都将与此查询匹配. 使用最长的前缀匹配来解析查询模板,因此可以使用针对特定服务覆盖的高级模板. 静态查询始终首先得到解决,因此它们也可以覆盖模板.

templateRegexp

是可选的正则表达式,一旦选择此模板,该正则表达式将用于从全名中提取字段. 在此示例中,正则表达式将"-"之后的第一项作为数据库名称,并将其他所有内容作为标记. 有关此正则表达式的语法,请参见RE2参考.

def def = [
  name:"Query name",
  service:"service-${match(1)}-${match(2)}",
  dcs:["dc1", "dc42"],
  templateType:"name_prefix_match",
  templateRegexp:"^find_(.+?)_(.+?)$"
]

如果查询成功创建,将提供其ID

consulClient.createPreparedQuery(def, { res ->
  if (res.succeeded()) {
    def queryId = res.result()
    println("Query created: ${queryId}")
  } else {
    res.cause().printStackTrace()
  }
})

准备好的查询可以通过其ID执行

consulClient.executePreparedQuery(id, { res ->
  if (res.succeeded()) {
    def response = res.result()
    println("Found ${response.nodes.size()} nodes")
  } else {
    res.cause().printStackTrace()
  }
})

或通过必须与模板正则表达式匹配的查询字符串

consulClient.executePreparedQuery("find_1_2", { res ->
  // matches template regexp "^find_(.+?)_(.+?)$"
  if (res.succeeded()) {
    def response = res.result()
    println("Found ${response.nodes.size()} nodes")
  } else {
    res.cause().printStackTrace()
  }
})

最后, ConsulClient允许您修改,获取或删除准备好的查询

consulClient.deletePreparedQuery(query, { res ->
  if (res.succeeded()) {
    println("Query deleted")
  } else {
    res.cause().printStackTrace()
  }
})

Watches

监视是指定监视数据更新的数据视图(例如,节点列表,KV对,运行状况检查)的一种方式. 检测到更新时,将调用带有WatchResultHandler . 例如,您可以观察健康检查的状态,并在检查很关键时发出通知.

Watch.key("foo/bar", vertx).setHandler({ res ->
  if (res.succeeded()) {
    println("value: ${res.nextResult().value}")
  } else {
    res.cause().printStackTrace()
  }
}).start()

by  ICOPY.SITE