Skip to content

Commit

Permalink
feat: support customize dns lookup function (#723)
Browse files Browse the repository at this point in the history
Since Redis cluster doesn't support hostname at all (redis/redis#2410),
it's reasonable to resolve the hostnames to IPs before connecting.
  • Loading branch information
luin committed Oct 17, 2018
1 parent 6d13c54 commit b9c4793
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 70 deletions.
23 changes: 22 additions & 1 deletion lib/cluster/ClusterOptions.ts
@@ -1,4 +1,7 @@
import {NodeRole} from './util'
import {lookup} from 'dns'

export type DNSLookupFunction = (hostname: string, callback: (err: NodeJS.ErrnoException, address: string, family: number) => void) => void

/**
* Options for Cluster constructor
Expand Down Expand Up @@ -93,9 +96,26 @@ export interface IClusterOptions {
redisOptions?: any

/**
* By default, When a new Cluster instance is created,
* it will connect to the Redis cluster automatically.
* If you want to keep the instance disconnected until the first command is called,
* set this option to `true`.
*
* @default false
*/
lazyConnect?: boolean

/**
* Hostnames will be resolved to IP addresses via this function.
* This is needed when the addresses of startup nodes are hostnames instead
* of IPs.
*
* You may provide a custom `lookup` function when you want to customize
* the cache behavior of the default function.
*
* @default require('dns').lookup
*/
dnsLookup?: DNSLookupFunction
}

export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
Expand All @@ -108,5 +128,6 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
retryDelayOnClusterDown: 100,
retryDelayOnTryAgain: 100,
slotsRefreshTimeout: 1000,
slotsRefreshInterval: 5000
slotsRefreshInterval: 5000,
dnsLookup: lookup
}
166 changes: 116 additions & 50 deletions lib/cluster/index.ts
Expand Up @@ -2,16 +2,16 @@ import {EventEmitter} from 'events'
import ClusterAllFailedError from '../errors/ClusterAllFailedError'
import {defaults, noop} from '../utils/lodash'
import ConnectionPool from './ConnectionPool'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole} from './util'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions} from './util'
import ClusterSubscriber from './ClusterSubscriber'
import DelayQueue from './DelayQueue'
import ScanStream from '../ScanStream'
import {AbortError} from 'redis-errors'
import {AbortError, RedisError} from 'redis-errors'
import * as asCallback from 'standard-as-callback'
import * as PromiseContainer from '../promiseContainer'
import {CallbackFunction} from '../types';
import {IClusterOptions, DEFAULT_CLUSTER_OPTIONS} from './ClusterOptions'
import {sample, CONNECTION_CLOSED_ERROR_MSG, shuffle, timeout} from '../utils'
import {sample, CONNECTION_CLOSED_ERROR_MSG, shuffle, timeout, zipMap} from '../utils'
import * as commands from 'redis-commands'

const Deque = require('denque')
Expand All @@ -30,7 +30,7 @@ type ClusterStatus = 'end' | 'close' | 'wait' | 'connecting' | 'connect' | 'read
*/
class Cluster extends EventEmitter {
private options: IClusterOptions
private startupNodes: IRedisOptions[]
private startupNodes: Array<string | number | object>
private connectionPool: ConnectionPool
private slots: Array<NodeKey[]> = []
private manuallyClosing: boolean
Expand All @@ -43,6 +43,18 @@ class Cluster extends EventEmitter {
private status: ClusterStatus
private isRefreshing: boolean = false

/**
* Every time Cluster#connect() is called, this value will be
* auto-incrementing. The purpose of this value is used for
* discarding previous connect attampts when creating a new
* connection.
*
* @private
* @type {number}
* @memberof Cluster
*/
private connectionEpoch: number = 0

/**
* Creates an instance of Cluster.
*
Expand All @@ -54,7 +66,7 @@ class Cluster extends EventEmitter {
super()
Commander.call(this)

this.startupNodes = normalizeNodeOptions(startupNodes)
this.startupNodes = startupNodes
this.options = defaults(this.options, options, DEFAULT_CLUSTER_OPTIONS)

// validate options
Expand Down Expand Up @@ -117,59 +129,68 @@ class Cluster extends EventEmitter {
reject(new Error('Redis is already connecting/connected'))
return
}
const epoch = ++this.connectionEpoch
this.setStatus('connecting')

if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
throw new Error('`startupNodes` should contain at least one node.')
}

this.connectionPool.reset(this.startupNodes)

function readyHandler() {
this.setStatus('ready')
this.retryAttempts = 0
this.executeOfflineCommands()
this.resetNodesRefreshInterval()
resolve()
}
this.resolveStartupNodeHostnames().then((nodes) => {
if (this.connectionEpoch !== epoch) {
debug('discard connecting after resolving startup nodes because epoch not match: %d != %d', epoch, this.connectionEpoch)
reject(new RedisError('Connection is discarded because a new connection is made'))
return
}
if (this.status !== 'connecting') {
debug('discard connecting after resolving startup nodes because the status changed to %s', this.status)
reject(new RedisError('Connection is aborted'))
return
}
this.connectionPool.reset(nodes)

function readyHandler() {
this.setStatus('ready')
this.retryAttempts = 0
this.executeOfflineCommands()
this.resetNodesRefreshInterval()
resolve()
}

let closeListener: () => void
const refreshListener = () => {
this.removeListener('close', closeListener)
this.manuallyClosing = false
this.setStatus('connect')
if (this.options.enableReadyCheck) {
this.readyCheck((err, fail) => {
if (err || fail) {
debug('Ready check failed (%s). Reconnecting...', err || fail)
if (this.status === 'connect') {
this.disconnect(true)
let closeListener: () => void
const refreshListener = () => {
this.removeListener('close', closeListener)
this.manuallyClosing = false
this.setStatus('connect')
if (this.options.enableReadyCheck) {
this.readyCheck((err, fail) => {
if (err || fail) {
debug('Ready check failed (%s). Reconnecting...', err || fail)
if (this.status === 'connect') {
this.disconnect(true)
}
} else {
readyHandler.call(this)
}
} else {
readyHandler.call(this)
}
})
} else {
readyHandler.call(this)
})
} else {
readyHandler.call(this)
}
}
}

closeListener = function () {
this.removeListener('refresh', refreshListener)
reject(new Error('None of startup nodes is available'))
}
closeListener = function () {
this.removeListener('refresh', refreshListener)
reject(new Error('None of startup nodes is available'))
}

this.once('refresh', refreshListener)
this.once('close', closeListener)
this.once('close', this.handleCloseEvent.bind(this))
this.once('refresh', refreshListener)
this.once('close', closeListener)
this.once('close', this.handleCloseEvent.bind(this))

this.refreshSlotsCache(function (err) {
if (err && err.message === 'Failed to refresh slots cache.') {
Redis.prototype.silentEmit.call(this, 'error', err)
this.connectionPool.reset([])
}
}.bind(this))
this.subscriber.start()
this.refreshSlotsCache(function (err) {
if (err && err.message === 'Failed to refresh slots cache.') {
Redis.prototype.silentEmit.call(this, 'error', err)
this.connectionPool.reset([])
}
}.bind(this))
this.subscriber.start()
}).catch(reject)
})
}

Expand Down Expand Up @@ -639,6 +660,51 @@ class Cluster extends EventEmitter {
}
})
}

private dnsLookup (hostname: string): Promise<string> {
return new Promise((resolve, reject) => {
this.options.dnsLookup(hostname, (err, address) => {
if (err) {
debug('failed to resolve hostname %s to IP: %s', hostname, err.message)
reject(err)
} else {
debug('resolved hostname %s to IP %s', hostname, address)
resolve(address)
}
})
});
}

/**
* Normalize startup nodes, and resolving hostnames to IPs.
*
* This process happens every time when #connect() is called since
* #startupNodes and DNS records may chanage.
*
* @private
* @returns {Promise<IRedisOptions[]>}
*/
private resolveStartupNodeHostnames(): Promise<IRedisOptions[]> {
if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
return Promise.reject(new Error('`startupNodes` should contain at least one node.'))
}
const startupNodes = normalizeNodeOptions(this.startupNodes)

const hostnames = getUniqueHostnamesFromOptions(startupNodes)
if (hostnames.length === 0) {
return Promise.resolve(startupNodes)
}

return Promise.all(hostnames.map((hostname) => this.dnsLookup(hostname))).then((ips) => {
const hostnameToIP = zipMap(hostnames, ips)

return startupNodes.map((node) => (
hostnameToIP.has(node.host)
? Object.assign({}, node, {host: hostnameToIP.get(node.host)})
: node
))
})
}
}

Object.getOwnPropertyNames(Commander.prototype).forEach(name => {
Expand Down
15 changes: 14 additions & 1 deletion lib/cluster/util.ts
@@ -1,11 +1,15 @@
import {parseURL} from '../utils'
import {isIP} from 'net'
import {DNSLookupFunction} from './ClusterOptions';

export type NodeKey = string
export type NodeRole = 'master' | 'slave' | 'all'

export interface IRedisOptions {
port: number,
host: string
host: string,
password?: string,
[key: string]: any
}

export function getNodeKey(node: IRedisOptions): NodeKey {
Expand Down Expand Up @@ -43,3 +47,12 @@ export function normalizeNodeOptions(nodes: Array<string | number | object>): IR
return options
})
}

export function getUniqueHostnamesFromOptions (nodes: IRedisOptions[]): string[] {
const uniqueHostsMap = {}
nodes.forEach((node) => {
uniqueHostsMap[node.host] = true
})

return Object.keys(uniqueHostsMap).filter(host => !isIP(host))
}
2 changes: 1 addition & 1 deletion lib/types.ts
@@ -1 +1 @@
export type CallbackFunction<T = void> = (err?: Error | null, result?: T) => void
export type CallbackFunction<T = void> = (err?: NodeJS.ErrnoException | null, result?: T) => void
8 changes: 8 additions & 0 deletions lib/utils/index.ts
Expand Up @@ -325,3 +325,11 @@ export function shuffle<T> (array: T[]): T[] {
* Error message for connection being disconnected
*/
export const CONNECTION_CLOSED_ERROR_MSG = 'Connection is closed.'

export function zipMap<K, V> (keys: K[], values: V[]): Map<K, V> {
const map = new Map<K, V>()
keys.forEach((key, index) => {
map.set(key, values[index])
})
return map
}
9 changes: 9 additions & 0 deletions test/functional/cluster/connect.js
Expand Up @@ -353,4 +353,13 @@ describe('cluster:connect', function () {
{ host: '127.0.0.1', port: '30001' }
], { slotsRefreshInterval: 100, redisOptions: { lazyConnect: false } });
});

it('throws when startupNodes is empty', (done) => {
const cluster = new Redis.Cluster(null, {lazyConnect: true})
cluster.connect().catch(err => {
expect(err.message).to.eql('`startupNodes` should contain at least one node.')
cluster.disconnect()
done()
})
})
});
58 changes: 58 additions & 0 deletions test/functional/cluster/dnsLookup.js
@@ -0,0 +1,58 @@
describe('cluster:dnsLookup', () => {
it('resolve hostnames to IPs', (done) => {
const slotTable = [
[0, 1000, ['127.0.0.1', 30001]],
[1001, 16383, ['127.0.0.1', 30002]]
]
new MockServer(30001, (argv, c) => {
}, slotTable)
new MockServer(30002, (argv, c) => {
}, slotTable)

const cluster = new Redis.Cluster([
{ host: 'localhost', port: '30001' }
])
cluster.on('ready', () => {
const nodes = cluster.nodes('master')
expect(nodes.length).to.eql(2)
expect(nodes[0].options.host).to.eql('127.0.0.1')
expect(nodes[1].options.host).to.eql('127.0.0.1')
cluster.disconnect()
done()
})
})

it('support customize dnsLookup function', (done) => {
let dnsLookupCalledCount = 0
const slotTable = [
[0, 1000, ['127.0.0.1', 30001]],
[1001, 16383, ['127.0.0.1', 30002]]
]
new MockServer(30001, (argv, c) => {
}, slotTable)
new MockServer(30002, (argv, c) => {
}, slotTable)

const cluster = new Redis.Cluster([
{ host: 'a.com', port: '30001' }
], {
dnsLookup (hostname, callback) {
dnsLookupCalledCount += 1
if (hostname === 'a.com') {
callback(null, '127.0.0.1')
} else {
callback(new Error('Unknown hostname'))
}
}
})
cluster.on('ready', () => {
const nodes = cluster.nodes('master')
expect(nodes.length).to.eql(2)
expect(nodes[0].options.host).to.eql('127.0.0.1')
expect(nodes[1].options.host).to.eql('127.0.0.1')
expect(dnsLookupCalledCount).to.eql(1)
cluster.disconnect()
done()
})
})
})
2 changes: 2 additions & 0 deletions test/functional/cluster/quit.js
Expand Up @@ -23,6 +23,7 @@ describe('cluster:quit', () => {
cluster.quit((err, res) => {
expect(err).to.eql(null)
expect(res).to.eql('OK')
cluster.disconnect()
done()
})
})
Expand Down Expand Up @@ -51,6 +52,7 @@ describe('cluster:quit', () => {
cluster.on('ready', () => {
cluster.quit((err) => {
expect(err.message).to.eql(ERROR_MESSAGE)
cluster.disconnect()
done()
})
})
Expand Down

0 comments on commit b9c4793

Please sign in to comment.