Skip to main content

Reactive PostgreSQL客户端是PostgreSQL的客户端,具有直接的API,专注于可伸缩性和低开销.

客户端是反应性的且非阻塞的,从而允许通过单个线程处理许多数据库连接.

  • 事件驱动

  • Lightweight

  • 内置连接池

  • 准备查询缓存

  • 使用PostgreSQL NOTIFY/LISTEN发布/订阅

  • 批处理和游标

  • Row streaming

  • 命令流水线

  • RxJava 1和RxJava 2

  • 直接存储到对象,没有不必要的副本

  • Java 8日期和时间

  • SSL/TLS

  • Unix域套接字

  • HTTP / 1.x CONNECT,SOCKS4a或SOCKS5代理支持

Usage

要使用Reactive PostgreSQL Client,请将以下依赖项添加到构建描述符的" dependencies"部分:

  • Maven(在您的pom.xml ):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-lang-ruby</artifactId>
 <version>3.8.4</version>
</dependency>
  • Gradle(在您的build.gradle文件中):

dependencies {
 compile 'io.vertx:vertx-lang-ruby:3.8.4'
}

Getting started

这是连接,查询和断开连接的最简单方法

require 'vertx-pg-client/pg_pool'

# Connect options
connectOptions = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
}

# Pool options
poolOptions = {
  'maxSize' => 5
}

# Create the client pool
client = VertxPgClient::PgPool.pool(connectOptions, poolOptions)

# A simple query
client.query("SELECT * FROM users WHERE id='julien'") { |ar_err,ar|
  if (ar_err == nil)
    result = ar
    puts "Got #{result.size()} rows "
  else
    puts "Failure: #{ar_err.get_message()}"
  end

  # Now close the pool
  client.close()
}

Connecting to PostgreSQL

大多数时候,您将使用池连接到PostgreSQL:

require 'vertx-pg-client/pg_pool'

# Connect options
connectOptions = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
}

# Pool options
poolOptions = {
  'maxSize' => 5
}

# Create the pooled client
client = VertxPgClient::PgPool.pool(connectOptions, poolOptions)

池化的客户端使用连接池,任何操作都将从池中借用连接以执行该操作并将其释放到池中.

如果使用Vert.x运行,则可以将其传递给您的Vertx实例:

require 'vertx-pg-client/pg_pool'

# Connect options
connectOptions = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
}

# Pool options
poolOptions = {
  'maxSize' => 5
}
# Create the pooled client
client = VertxPgClient::PgPool.pool(vertx, connectOptions, poolOptions)

当您不再需要池时,您需要释放它:

# Close the pool and all the associated resources
pool.close()

需要在同一连接上执行多个操作时,需要使用客户端connection .

您可以轻松地从游泳池中获得一个:

require 'vertx-pg-client/pg_pool'

# Connect options
connectOptions = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
}

# Pool options
poolOptions = {
  'maxSize' => 5
}

# Create the pooled client
client = VertxPgClient::PgPool.pool(vertx, connectOptions, poolOptions)

# Get a connection from the pool
client.get_connection() { |ar1_err,ar1|

  if (ar1_err == nil)

    puts "Connected"

    # Obtain our connection
    conn = ar1

    # All operations execute on the same connection
    conn.query("SELECT * FROM users WHERE id='julien'") { |ar2_err,ar2|
      if (ar2_err == nil)
        conn.query("SELECT * FROM users WHERE id='emad'") { |ar3_err,ar3|
          # Release the connection to the pool
          conn.close()
        }
      else
        # Release the connection to the pool
        conn.close()
      end
    }
  else
    puts "Could not connect: #{ar1_err.get_message()}"
  end
}

完成连接后,必须关闭它才能将其释放到池中,以便可以重用它.

有时您想通过Unix域套接字连接来提高性能,我们可以通过Vert.x本机传输来实现.

确保已在类路径中添加了所需的netty-transport-native依赖项,并启用了Unix域套接字选项.

require 'vertx-pg-client/pg_pool'

# Connect Options
# Socket file name will be /var/run/postgresql/.s.PGSQL.5432
connectOptions = {
  'host' => "/var/run/postgresql",
  'port' => 5432,
  'database' => "the-db"
}

# Pool options
poolOptions = {
  'maxSize' => 5
}

# Create the pooled client
client = VertxPgClient::PgPool.pool(connectOptions, poolOptions)

# Create the pooled client with a vertx instance
# Make sure the vertx instance has enabled native transports
client2 = VertxPgClient::PgPool.pool(vertx, connectOptions, poolOptions)

可以在[Vert.x文档]( https://vertx.io/docs/vertx-core/java/#_native_transports )中找到更多信息.

Configuration

您可以通过多种方法来配置客户端.

data object

配置客户端的一种简单方法是指定PgConnectOptions数据对象.

require 'vertx-pg-client/pg_pool'

# Data object
connectOptions = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
}

# Pool Options
poolOptions = {
  'maxSize' => 5
}

# Create the pool from the data object
pool = VertxPgClient::PgPool.pool(vertx, connectOptions, poolOptions)

pool.get_connection() { |ar_err,ar|
  # Handling your connection
}

您也可以使用setPropertiesaddProperty方法配置通用属性. 注意setProperties将覆盖默认的客户端属性.

例如,您可以通过添加search_path属性来为连接设置默认架构.

Code not translatable

有关可用属性的更多信息,请参见PostgreSQL手册 .

connection uri

除了使用PgConnectOptions数据对象进行配置外,当您要使用连接URI进行配置时,我们还为您提供了另一种连接方法:

require 'vertx-pg-client/pg_pool'
require 'vertx-pg-client/pg_connection'

# Connection URI
connectionUri = "postgresql://dbuser:[email protected]:3211/mydb"

# Create the pool from the connection URI
pool = VertxPgClient::PgPool.pool(connectionUri)

# Create the connection from the connection URI
VertxPgClient::PgConnection.connect(vertx, connectionUri) { |res_err,res|
  # Handling your connection
}

有关连接字符串格式的更多信息,请参见PostgreSQL手册 .

当前客户端在连接uri中支持以下参数关键字

  • host

  • hostaddr

  • port

  • user

  • password

  • dbname

  • sslmode

  • 属性包括(application_name,fallback_application_name,search_path)

注意:在连接URI中配置属性将覆盖默认属性.

environment variables

您还可以使用环境变量来设置默认的连接设置值,这在您要避免对数据库连接信息进行硬编码时很有用. 您可以参考官方文档以了解更多详细信息. 支持以下参数:

  • PGHOST

  • PGHOSTADDR

  • PGPORT

  • PGDATABASE

  • PGUSER

  • PGPASSWORD

  • PGSSLMODE

如果您未指定要连接的数据对象或连接URI字符串,则环境变量将优先于它们.

$ PGUSER=user \
 PGHOST=the-host \
 PGPASSWORD=secret \
 PGDATABASE=the-db \
 PGPORT=5432 \
 PGSSLMODE=DISABLE
require 'vertx-pg-client/pg_pool'
require 'vertx-pg-client/pg_connection'

# Create the pool from the environment variables
pool = VertxPgClient::PgPool.pool()

# Create the connection from the environment variables
VertxPgClient::PgConnection.connect(vertx) { |res_err,res|
  # Handling your connection
}

index.adoc中未解决的指令-include :: queries.adoc []

您可以在查询中使用" RETURNING"子句来获取生成的键:

require 'vertx-sql-client/tuple'
client.prepared_query("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id", VertxSqlClient::Tuple.of("white", "red", "blue")) { |ar_err,ar|
  if (ar_err == nil)
    rows = ar
    puts rows.row_count()
    rows.each do |row|
      puts "generated key: #{row.get_integer("color_id")}"
    end
  else
    puts "Failure: #{ar_err.get_message()}"
  end
}

index.adoc中未解决的指令-include :: connections.adoc []

index.adoc中未解决的指令-include :: transactions.adoc []

index.adoc中未解决的指令-include :: cursor.adoc []

注意:PostreSQL在事务结束时销毁游标,因此游标API应该在事务内使用,否则您可能会收到34000 PostgreSQL错误.

PostgreSQL type mapping

当前客户端支持以下PostgreSQL类型

  • BOOLEAN( java.lang.Boolean

  • INT2( java.lang.Short

  • INT4( java.lang.Integer

  • INT8( java.lang.Long

  • FLOAT4( java.lang.Float

  • FLOAT8( java.lang.Double

  • CHAR( java.lang.String

  • VARCHAR( java.lang.String

  • TEXT( java.lang.String

  • 枚举( java.lang.String

  • 名称( java.lang.String

  • SERIAL2( java.lang.Short

  • SERIAL4( java.lang.Integer

  • SERIAL8( java.lang.Long

  • NUMERIC( io.vertx.sqlclient.data.Numeric

  • UUID( java.util.UUID

  • DATE( java.time.LocalDate

  • TIME( java.time.LocalTime

  • TIMETZ( java.time.OffsetTime

  • TIMESTAMP( java.time.LocalDateTime

  • TIMESTAMPTZ( java.time.OffsetDateTime

  • 间隔( io.vertx.pgclient.data.Interval

  • BYTEA( io.vertx.core.buffer.Buffer

  • JSON( io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArrayNumberBooleanStringio.vertx.sqlclient.Tuple#JSON_NULL

  • JSONB( io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArrayNumberBooleanStringio.vertx.sqlclient.Tuple#JSON_NULL

  • 要点( io.vertx.pgclient.data.Point

  • 线( io.vertx.pgclient.data.Line

  • LSEG( io.vertx.pgclient.data.LineSegment

  • BOX( io.vertx.pgclient.data.Box

  • 路径( io.vertx.pgclient.data.Path

  • POLYGON( io.vertx.pgclient.data.Polygon

  • 圆( io.vertx.pgclient.data.Circle

  • TSVECTOR( java.lang.String

  • TSQUERY( java.lang.String

元组解码在存储值时使用上述类型,并且在可能的情况下也在flu转换上执行实际值:

pool.query("SELECT 1::BIGINT \"VAL\"") { |ar_err,ar|
  rowSet = ar
  row = rowSet.iterator().next()

  # Stored as java.lang.Long
  value = row.get_value(0)

  # Convert to java.lang.Integer
  intValue = row.get_integer(0)
}

元组编码使用上述类型映射进行编码,除非类型是数字,在这种情况下,将改用java.lang.Number

pool.query("SELECT 1::BIGINT \"VAL\"") { |ar_err,ar|
  rowSet = ar
  row = rowSet.iterator().next()

  # Stored as java.lang.Long
  value = row.get_value(0)

  # Convert to java.lang.Integer
  intValue = row.get_integer(0)
}

支持这些类型的数组.

Handling JSON

PostgreSQL JSONJSONB类型由以下Java类型表示:

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL用于表示JSON空文字

Code not translatable

Handling NUMERIC

Numeric Java类型用于表示PostgreSQL NUMERIC类型.

numeric = row.get(Java::IoVertxSqlclientData::Numeric::class, 0)
if (numeric.na_n?())
  # Handle NaN
else
  value = numeric.big_decimal_value()
end

Handling arrays

数组在TupleRow上可用:

Code not translatable

Handling custom types

字符串用于表示自定义类型,既可以发送到Postgres也可以从Postgres返回.

您可以从PostgreSQL读取并以字符串形式获取自定义类型

require 'vertx-sql-client/tuple'
client.prepared_query("SELECT address, (address).city FROM address_book WHERE id=$1", VertxSqlClient::Tuple.of(3)) { |ar_err,ar|
  if (ar_err == nil)
    rows = ar
    rows.each do |row|
      puts "Full Address #{row.get_string(0)}, City #{row.get_string(1)}"
    end
  else
    puts "Failure: #{ar_err.get_message()}"
  end
}

您还可以通过提供一个字符串来写入PostgreSQL

require 'vertx-sql-client/tuple'
client.prepared_query("INSERT INTO address_book (id, address) VALUES ($1, $2)", VertxSqlClient::Tuple.of(3, "('Anytown', 'Second Ave', false)")) { |ar_err,ar|
  if (ar_err == nil)
    rows = ar
    puts rows.row_count()
  else
    puts "Failure: #{ar_err.get_message()}"
  end
}

使用Java String处理文本搜索

require 'vertx-sql-client/tuple'
client.prepared_query("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )", VertxSqlClient::Tuple.of("fat cats ate fat rats", "fat & rat")) { |ar_err,ar|
  if (ar_err == nil)
    rows = ar
    rows.each do |row|
      puts "Match : #{row.get_boolean?(0)}"
    end
  else
    puts "Failure: #{ar_err.get_message()}"
  end
}

tsvector and tsquery can be fetched from db using java String

require 'vertx-sql-client/tuple'
client.prepared_query("SELECT to_tsvector( $1 ), to_tsquery( $2 )", VertxSqlClient::Tuple.of("fat cats ate fat rats", "fat & rat")) { |ar_err,ar|
  if (ar_err == nil)
    rows = ar
    rows.each do |row|
      puts "Vector : #{row.get_string(0)}, query : #{row.get_string(1)}"
    end
  else
    puts "Failure: #{ar_err.get_message()}"
  end
}

Collector queries

您可以将Java收集器与查询API结合使用:

Code not translatable

收集器处理不得在该Row上保留引用,因为只有一行用于处理整个集合.

Java Collectors提供了许多有趣的预定义收集器,例如,您可以直接从行集中轻松创建一个字符串:

Code not translatable

Pub/sub

PostgreSQL支持发布/订阅通信渠道.

您可以设置一个notificationHandler来接收PostgreSQL通知:

connection.notification_handler() { |notification|
  puts "Received #{notification['payload']} on channel #{notification['channel']}"
}

connection.query("LISTEN some-channel") { |ar_err,ar|
  puts "Subscribed to channel"
}

PgSubscriber是一个通道管理器,管理单个连接,该连接提供每个通道的订阅:

require 'vertx-pg-client/pg_subscriber'

subscriber = VertxPgClient::PgSubscriber.subscriber(vertx, {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
})

# You can set the channel before connect
subscriber.channel("channel1").handler() { |payload|
  puts "Received #{payload}"
}

subscriber.connect() { |ar_err,ar|
  if (ar_err == nil)

    # Or you can set the channel after connect
    subscriber.channel("channel2").handler() { |payload|
      puts "Received #{payload}"
    }
  end
}

分配给channel方法的通道名称将是PostgreSQL保留的用于发送通知的通道的确切名称. 请注意,这与SQL中通道名称的表示形式不同,并且内部PgSubscriber将准备提交的通道名称作为带引号的标识符:

require 'vertx-pg-client/pg_subscriber'

subscriber = VertxPgClient::PgSubscriber.subscriber(vertx, {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
})

subscriber.connect() { |ar_err,ar|
  if (ar_err == nil)
    # Complex channel name - name in PostgreSQL requires a quoted ID
    subscriber.channel("Complex.Channel.Name").handler() { |payload|
      puts "Received #{payload}"
    }
    subscriber.channel("Complex.Channel.Name").subscribe_handler() { |subscribed|
      subscriber.actual_connection().query("NOTIFY \"Complex.Channel.Name\", 'msg'") { |notified_err,notified|
        puts "Notified \"Complex.Channel.Name\""
      }
    }

    # PostgreSQL simple ID's are forced lower-case
    subscriber.channel("simple_channel").handler() { |payload|
      puts "Received #{payload}"
    }
    subscriber.channel("simple_channel").subscribe_handler() { |subscribed|
      # The following simple channel identifier is forced to lower case
      subscriber.actual_connection().query("NOTIFY Simple_CHANNEL, 'msg'") { |notified_err,notified|
        puts "Notified simple_channel"
      }
    }

    # The following channel name is longer than the current
    # (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
    subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb").handler() { |payload|
      puts "Received #{payload}"
    }
  end
}

您可以提供一个重新连接策略作为将retries作为参数并返回一个amountOfTime值的函数:

  • amountOfTime < 0 :订户已关闭并且没有重试

  • amountOfTime = 0 :订户重试以立即连接

  • amountOfTime > 0 :订阅者在amountOfTime毫秒后重试

require 'vertx-pg-client/pg_subscriber'

subscriber = VertxPgClient::PgSubscriber.subscriber(vertx, {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret"
})

# Reconnect at most 10 times after 100 ms each
subscriber.reconnect_policy(lambda { |retries|
  if (retries < 10)
    return 100
  else
    return -1
  end
})

默认策略是不重新连接.

Cancelling Request

PostgreSQL支持取消进行中的请求. 您可以使用cancelRequest取消飞行中的请求. 取消请求将打开与服务器的新连接,并取消请求,然后关闭连接.

connection.query("SELECT pg_sleep(20)") { |ar_err,ar|
  if (ar_err == nil)
    # imagine this is a long query and is still running
    puts "Query success"
  else
    # the server will abort the current query after cancelling request
    puts "Failed to query due to #{ar_err.get_message()}"
  end
}
connection.cancel_request() { |ar_err,ar|
  if (ar_err == nil)
    puts "Cancelling request has been sent"
  else
    puts "Failed to send cancelling request"
  end
}

取消信号可能会或可能不会有任何影响-例如,如果它在后端完成查询处理之后到达,则它将没有影响. 如果取消有效,则会导致当前命令提前终止并显示错误消息.

可以在官方文档中找到更多信息.

Using SSL/TLS

要将客户端配置为使用SSL连接,可以将PgConnectOptions配置为Vert.x NetClient . 支持所有SSL模式 ,并且您可以配置sslmode . 默认情况下,客户端处于DISABLE SSL模式. ssl参数仅是设置sslmode快捷方式. setSsl(true)等同于setSslMode(VERIFY_CA)setSsl(false)等同于setSslMode(DISABLE) .

require 'vertx-pg-client/pg_connection'

options = {
  'port' => 5432,
  'host' => "the-host",
  'database' => "the-db",
  'user' => "user",
  'password' => "secret",
  'sslMode' => "VERIFY_CA",
  'pemTrustOptions' => {
    'certPaths' => [
      "/path/to/cert.pem"
    ]
  }
}

VertxPgClient::PgConnection.connect(vertx, options) { |res_err,res|
  if (res_err == nil)
    # Connected with SSL
  else
    puts "Could not connect #{res_err}"
  end
}

可以在Vert.x文档中找到更多信息.

Using a proxy

您还可以将客户端配置为使用HTTP / 1.x CONNECT,SOCKS4a或SOCKS5代理.

可以在Vert.x文档中找到更多信息.

by  ICOPY.SITE