在分布式数据库汇总工作,优化网络带宽 [英] Optimizing network bandwidth over distributed database aggregation jobs

查看:156
本文介绍了在分布式数据库汇总工作,优化网络带宽的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个分布式/联合数据库结构如下:

I have a distributed/federated database structured as follows:

  1. 在三个地理区域(节点)
  2. 的数据库有s $ P $垫
  3. 在多个数据库聚集在每一个节点
  4. 的关系型数据库是PostgreSQL,MySQL和甲骨文,和MS SQL Server的混合;非关系型数据库MongoDB的两种或卡桑德拉
  5. 在每个节点和整个节点联合
  6. 在松耦合通过RabbitMQ的实现,与运​​行的RabbitMQ代理的每个节点
  1. The databases are spread across three geographic locations ("nodes")
  2. Multiple databases are clustered at each node
  3. The relational databases are a mix of PostgreSQL, MySQL, Oracle, and MS SQL Server; the non-relational databases are either MongoDB or Cassandra
  4. Loose coupling within each node and across the node federation is achieved via RabbitMQ, with each node running a RabbitMQ broker

我在执行一个只读节点间聚集的工作系统,跨节点联合工作(即对不属于本地节点的工作)。这些工作只能执行获取查询 - 它们不会修改数据库。 (如果该作业的结果是为了进入的一个或多个数据库,然后,这是通过一个单独的作业不属于节点间作业系统我试图优化的一部分来实现的。)我的目标是最小化的通过这些工作所需的网络带宽(第一,以尽量减少节点间/ WAN带宽,然后以最小化节点内/ LAN带宽);我认为一个统一的费用为每个广域网连接,而另一个统一的费用为每个局域网连接。的作业,没有特别时间敏感。我执行中的一个节点,但不是节点之间的一些CPU的负载平衡。

I am implementing a readonly inter-node aggregation job system for jobs that span the node federation (i.e. for jobs that are not local to a node). These jobs only perform "get" queries - they do not modify the databases. (If the results of the jobs are intended to go into one or more of the databases then this is accomplished by a separate job that is not part of the inter-node job system I am trying to optimize.) My objective is to minimize the network bandwidth required by these jobs (first to minimize the inter-node / WAN bandwidth, then to minimize the intra-node / LAN bandwidth); I assume a uniform cost for each WAN link, and another uniform cost for each LAN link. The jobs are not particularly time-sensitive. I perform some CPU load-balancing within a node but not between nodes.

数据通过广域网/局域网输送的聚合作业的量是相对于数据库写入的本地到群集或到特定数据库的量小,所以它是不实际跨越充分分发数据库全国妇女联合会。

The amount of data transported across the WAN/LAN for the aggregation jobs is small relative to the amount of database writes that are local to a cluster or to a specific database, so it would not be practical to fully distribute the databases across the federation.

的基本算法我用最小化的网络带宽是:

The basic algorithm I use for minimizing network bandwidth is:

  1. 给定作业上的一组数据的运行的是横跨联合会小号$ P $垫,管理器节点将消息发送到每个其它节点包含相关的数据库查询。
  2. 在每个节点运行其设定的查询,COM presses他们用gzip,缓存他们,并把他们融为一体pressed尺寸到管理器节点。
  3. 管理者移动到包含多个数据的节点(具体地,到机器具有最大数据群集内并且具有空闲内核);它从其他两个节点和从集群内的其他机器请求数据的剩余部分,​​则它运行作业。

当可能的作业使用分而治之方法以最小化所需要的数据的协同定位的量。例如,如果工作需要来计算整个联盟的所有销售数字的款项,然后在每个节点本地计算其销售金额然后将其在管理节点汇总(而不是复制所有未处理的销售数据到管理器节点) 。但是,有时数据协同定位,需要(在执行与位于不同节点的两个表之间的联接时如)。

When possible the jobs use a divide-and-conquer approach to minimize the amount of data co-location that is needed. For example, if the job needs to compute the sums of all Sales figures across the federation, then each node locally calculates its Sales sums which are then aggregated at the manager node (rather than copying all of the unprocessed Sales data to the manager node). However, sometimes (such as when performing a join between two tables that are located at different nodes) data co-location is needed.

第一件事,我做了优化,这是总的就业机会,并运行在10分钟时代汇总工作(机器都运行NTP协议,这样我就可以合理确定每隔十分钟是指同样的事情在每个节点处)。我们的目标是为两个作业,以便能够共享相同的数据,从而降低了传输的数据的总成本。

The first thing I did to optimize this was aggregate the jobs, and to run the aggregated jobs at ten minute epochs (the machines are all running NTP, so I can be reasonably certain that "every ten minutes" means the same thing at each node). The goal is for two jobs to be able to share the same data, which reduces the overall cost of transporting the data.

  1. 的查询同一个表中给出两份工作,我产生每个作业的结果集,然后我把两个结果的交集。
  2. 如果这两个作业被安排在同一个节点上运行,则该网络传输成本被计算为两个结果的总和减去两个结果的交叉点。
  3. 的两个结果被存储到PostgreSQL临时表(在关系数据的情况下),否则,以临时卡桑德拉columnfamilies / MongoDB的集合(在nosql的数据的情况下),在选定的运行作业的节点;原始查询然后对合并的结果集执行,传送的数据是对各个作业。 (这一步只执行对结果集相结合;单个结果集的数据简单地交付给它的工作而无需首先被存储在临时表/列族/集)

这将导致一个改进的网络带宽,但我不知道是否有一个框架/库/算法,将改善这一点。一个选择,我考虑的是缓存结果集的一个节点,并考虑到这些缓存的结果集确定网络带宽时(即试图重用整个工作结果集,除了当前设定pre-预定同一地​​点工作,所以即如在一个10分钟的时期运行作业可以使用从previous 10分钟的结果集缓存的结果集),但除非作业使用完全相同的结果集(即除非他们使用相同的where子句中),那么我不知道一个通​​用的算法,将填补在结果的差距(例如,如果结果集使用的条款,其中N> 3等不同的工作所需要的结果集的条款,其中N> 0,然后我什么算法可以用它来确定,我需要把原来的结果集的联盟和与条款的结果集,其中N> 0和N = 3) - 我可以尝试写我自己的算法来做到这一点,但结果将是一个越野车无用的混乱。我还需要确定何时缓存的数据是陈旧的 - 要做到这一点最简单的方法是将缓存数据的时间戳和源表上的最后一次修改的时间戳进行比较,并更换所有的数据,如果时间戳已经改变,但理想我希望能够只更新已与每行或每块的时间戳改变的值。

This results in an improvement to network bandwidth, but I'm wondering if there's a framework/library/algorithm that would improve on this. One option I considered is to cache the resultsets at a node and to account for these cached resultsets when determining network bandwidth (i.e. trying to reuse resultsets across jobs in addition to the current set of pre-scheduled co-located jobs, so that e.g. a job run in one 10-minute epoch can use a cached resultset from a previous 10-minute resultset), but unless the jobs use the exact same resultsets (i.e. unless they use identical where clauses) then I don't know of a general-purpose algorithm that would fill in the gaps in the resultset (for example, if the resultset used the clause "where N > 3" and a different job needs the resultset with the clause "where N > 0" then what algorithm could I use to determine that I need to take the union of the original resultset and with the resultset with the clause "where N > 0 AND N <= 3") - I could try to write my own algorithm to do this, but the result would be a buggy useless mess. I would also need to determine when the cached data is stale - the simplest way to do this is to compare the cached data's timestamp with the last-modified timestamp on the source table and replace all of the data if the timestamp has changed, but ideally I'd want to be able to update only the values that have changed with per-row or per-chunk timestamps.

推荐答案

我已经开始实施我的解决问题的办法。

I've started to implement my solution to the question.

为了简化节点内的缓存也,我使用的每一个数据库集群(卡桑德拉节点)一个卡桑德拉数据库运行聚集作业简化CPU负载均衡(previously我汇总用手本地数据库结果集) - 我使用单卡珊德拉数据库中的关系,卡桑德拉和MongoDB的数据(不足之处是有些关系查询运行在卡桑德拉慢,但这是弥补了一个事实,即单统一聚集数据库是很容易,在单独的关系和非关系数据库的聚集,以保持)。我也不再聚集在10分钟时期的工作,因为缓存使得这个算法是不必要的。

In order to simplify the intra-node cache and also to simplify CPU load balancing, I'm using a Cassandra database at each database cluster ("Cassandra node") to run the aggregation jobs (previously I was aggregating the local database resultsets by hand) - I'm using the single Cassandra database for the relational, Cassandra, and MongoDB data (the downside is that some relational queries run slower on Cassandra, but this is made up for by the fact that the single unified aggregation database is easier to maintain than the separate relational and non-relational aggregation databases). I am also no longer aggregating jobs in ten minute epochs since the cache makes this algorithm unnecessary.

在一个节点中的每个机器是指一个卡桑德拉的ColumnFamily称为Cassandra_Cache_ [机号]被用来存储key_ids和column_ids,它已发送给卡桑德拉节点。所述Cassandra_Cache的ColumnFamily由一个表列,一个Primary_Key塔,COLUMN_ID塔,Last_Modified_Timestamp塔,Last_Used_Timestamp柱,和一个组合键组成的表中第| Primary_Key | COLUMN_ID。该Last_Modified_Timestamp列表示数据的last_modified时间戳从源数据库和Last_Used_Timestamp列表示时间戳在该数据上一次使用/通过汇聚作业读取。当从一台机器卡桑德拉节点请求数据,该机器计算该结果,然后取结果集和表的设定差|键|列是在其Cassandra_Cache和具有相同Last_Modified_Timestamp作为其Cassandra_Cache的行(如果时间戳不匹配则缓存的数据是陈旧的,并且随着新Last_Modified_Timestamp更新)。本地机器然后发送差集到卡桑德拉节点并更新其Cassandra_Cache与该组差异并更新Last_Used_Timestamp上被用来组成该结果每个高速缓存数据。 (更简单的替代维持对每个表一个单独的时间戳|键|列是维持一个时间戳为每个表|关键,但这种情况较少precise和表|键|。列时间戳并不复杂)保持在Cassandra_Caches之间同步的Last_Used_Timestamps只需要在本地计算机和远程节点发送与每个作业相关的Last_Used_Timestamp,由于作业中的所有数据都使用相同的Last_Used_Timestamp。

Each machine in a node refers to a Cassandra columnfamily called Cassandra_Cache_[MachineID] that is used to store the key_ids and column_ids that it has sent to the Cassandra node. The Cassandra_Cache columnfamily consists of a Table column, a Primary_Key column, a Column_ID column, a Last_Modified_Timestamp column, a Last_Used_Timestamp column, and a composite key consisting of the Table|Primary_Key|Column_ID. The Last_Modified_Timestamp column denotes the datum's last_modified timestamp from the source database, and the Last_Used_Timestamp column denotes the timestamp at which the datum was last used/read by an aggregation job. When the Cassandra node requests data from a machine, the machine calculates the resultset and then takes the set difference of the resultset and the table|key|columns that are in its Cassandra_Cache and that have the same Last_Modified_Timestamp as the rows in its Cassandra_Cache (if the timestamps don't match then the cached data is stale and is updated along with the new Last_Modified_Timestamp). The local machine then sends the set difference to the Cassandra node and updates its Cassandra_Cache with the set difference and updates the Last_Used_Timestamp on each cached datum that was used to compose the resultset. (A simpler alternative to maintaining a separate timestamp for each table|key|column is to maintain a timestamp for each table|key, but this is less precise and the table|key|column timestamp is not overly complex.) Keeping the Last_Used_Timestamps in sync between Cassandra_Caches only requires that the local machines and remote nodes send the Last_Used_Timestamp associated with each job, since all data within a job uses the same Last_Used_Timestamp.

卡桑德拉节点更新其结果集与它接收到来自节点内,并且也与它接收到来自其他节点的数据新的数据。卡桑德拉节点还维护的ColumnFamily存储相同的数据是在每台机器的Cassandra_Cache(除了Last_Modified_Timestamp,也就是只需要在本地机器上,以确定何时数据是陈旧的),以及一个源ID,指示如果数据来从内部内的节点或从另一节点 - 的ID的不同节点之间的区别,但在本地节点中的不同的机器并不区分。 (另一种选择是使用一个统一Cassandra_Cache而不是使用一台机器一个Cassandra_Cache加上另一个Cassandra_Cache为节点,但我决定,增加的复杂性是不值得的节省空间。)

The Cassandra node updates its resultset with the new data that it receives from within the node and also with the data that it receives from the other nodes. The Cassandra node also maintains a columnfamily that stores the same data that is in each machine's Cassandra_Cache (except for the Last_Modified_Timestamp, which is only needed on the local machine to determine when data is stale), along with a source id indicating if the data came from within the within the node or from another node - the id distinguishes between the different nodes, but does not distinguish between the different machines within the local node. (Another option is to use a unified Cassandra_Cache rather than using one Cassandra_Cache per machine plus another Cassandra_Cache for the node, but I decided that the added complexity was not worth the space savings.)

每个卡桑德拉节点还维护一个Federated_Cassandra_Cache,它由的{数据库,表,Primary_Key,COLUMN_ID,Last_Used_Timestamp}元组已发送从本地节点向其他两个节点之一。

Each Cassandra node also maintains a Federated_Cassandra_Cache, which consists of the {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} tuples that have been sent from the local node to one of the other two nodes.

在一个作业来通过管道,每个卡桑德拉节点更新其节点内的缓存与当地结果集,也完成了,可以在本地执行的子作业(例如,在工作总结的多个节点之间的数据,每节点求和,以最小化数据的需要被协同定位在节点间联合会)的量其节点内数据 - 子作业可以在本地进行,如果它仅使用节点内的数据。经理节点然后确定在哪个节点进行作业的其余部分:每个卡桑德拉节点可以通过利用其结果集的差集和结果集根据已缓存的子集本地计算发送其结果集到另一个节点的成本其Federated_Cassandra_Cache和管理器节点最大限度地减少了成本等式[费用从NodeX结果集运+的成本从节点j运输的结果集。例如,它的成本节点1 {3,5}运输其结果集为{节点2,节点3},它的成本节点2 {2,2}运输其结果集为{节点1,节点3},它的成本节点3 {4,3}运输其结果集为{节点1,节点2},因此作业运行节点1与成本6。

When a job comes through the pipeline, each Cassandra node updates its intra-node cache with the local resultsets, and also completes the sub-jobs that can be performed locally (e.g. in a job to sum data between multiple nodes, each node sums its intra-node data in order to minimize the amount of data that needs to be co-located in the inter-node federation) - a sub-job can be performed locally if it only uses intra-node data. The manager node then determines on which node to perform the rest of the job: each Cassandra node can locally compute the cost of sending its resultset to another node by taking the set difference of its resultset and the subset of the resultset that has been cached according to its Federated_Cassandra_Cache, and the manager node minimizes the cost equation ["cost to transport resultset from NodeX" + "cost to transport resultset from NodeY"]. For example, it costs Node1 {3, 5} to transport its resultset to {Node2, Node3}, it costs Node2 {2, 2} to transport its resultset to {Node1, Node3}, and it costs Node3 {4, 3} to transport its resultset to {Node1, Node2}, therefore the job is run on Node1 with a cost of "6".

我使用的是每个卡桑德拉节点的LRU驱逐策略;我原来使用的是最古老的一驱逐政策,因为它更容易实现,并需要更少的写入Last_Used_Timestamp列(每个数据更新一次,而不是一次每个数据的读取),但一个LRU政策的实施结果不是过于复杂的Last_Used_Timestamp写入没有创建一个瓶颈。当一个卡桑德拉节点达到20%的自由空间在退出数据,直到达到30%的自由空间,因此每个驱逐大约是总可用空间的10%的大小。的节点维护两个时间戳:上次逐出节点内数据的时间戳,和上次逐出节点间/联合数据的时间戳;由于节点间通信相对于节点内通信的增加的延迟,所述驱逐策略的目标,具有高速缓存的数据的75%的节点间数据和高速缓存的数据的25%是节点内的数据,这可以通过具有每个驱逐的25%是节点间数据和每个驱逐的75%是节点内的数据被迅速地近似。驱逐的工作原理如下:

I'm using an LRU eviction policy for each Cassandra node; I was originally using an oldest-first eviction policy because it is simpler to implement and requires fewer writes to the Last_Used_Timestamp column (once per datum update instead of once per datum read), but the implementation of an LRU policy turned out not to be overly complex and the Last_Used_Timestamp writes did not create a bottleneck. When a Cassandra node reaches 20% free space it evicts data until it reaches 30% free space, hence each eviction is approximately the size of 10% of the total space available. The node maintains two timestamps: the timestamp of the last-evicted intra-node data, and the timestamp of the last-evicted inter-node / federated data; due to the increased latency of inter-node communication relative to that of intra-node communication, the goal of the eviction policy to have 75% of the cached data be inter-node data and 25% of the cached data be intra-node data, which can be quickly approximated by having 25% of each eviction be inter-node data and 75% of each eviction be intra-node data. Eviction works as follows:

while(evicted_local_data_size < 7.5% of total space available) {
    evict local data with Last_Modified_Timestamp < 
        (last_evicted_local_timestamp += 1 hour)
    update evicted_local_data_size with evicted data
}

while(evicted_federated_data_size < 2.5% of total space available) {
    evict federated data with Last_Modified_Timestamp < 
        (last_evicted_federated_timestamp += 1 hour)
    update evicted_federated_data_size with evicted data
}

赶出数据不会永久删除,直至驱逐确认已收到机器内的节点和其他节点。

Evicted data is not permanently deleted until eviction acknowledgments have been received from the machines within the node and from the other nodes.

卡桑德拉节点接着发送一个通知给指示新last_evicted_local_timestamp就是它的节点内的机器。在本地计算机更新Cassandra_Caches,以反映新的时间戳,并发送通知卡桑德拉节点时,这是完整的;当卡桑德拉节点已收到通知,所有本地计算机然后将其永久删除逐出本地数据。卡桑德拉节点也将发送通知给新last_evicted_federated_timestamp远程节点;其他节点更新Federated_Cassandra_Caches以反映新的时间戳和卡桑德拉节点将永久删除逐出联邦数据时,接收到来自各节点的通知(卡桑德拉节点跟踪哪些节点一个数据是从哪里来的,所以接收驱逐后,确认从NodeX节点可以从节点j接收驱逐确认之前永久删除逐出NodeX数据)。直到所有机器/节点都已发送通知,卡桑德拉节点使用缓存驱逐数据,它的查询,如果收到已不被驱逐的旧数据的机器/节点的一个结果。例如,卡桑德拉节点具有本地表| Primary_Key | COLUMN_ID基准,它已经被逐出,同时本地机器(其尚未处理驱逐请求)没有包括在表| Primary_Key |在其结果集COLUMN_ID基准,因为它认为该卡珊德拉节点已经在缓存的数据;卡桑德拉节点接收到结果集从本地机器,而且由于本地计算机没有承认驱逐请求卡桑德拉节点包括在其自己的结果集缓存驱逐数据。

The Cassandra node then sends a notification to the machines within its node indicating what the new last_evicted_local_timestamp is. The local machines update their Cassandra_Caches to reflect the new timestamp, and send a notification to the Cassandra node when this is complete; when the Cassandra node has received notifications from all local machines then it permanently deletes the evicted local data. The Cassandra node also sends a notification to the remote nodes with the new last_evicted_federated_timestamp; the other nodes update their Federated_Cassandra_Caches to reflect the new timestamp, and the Cassandra node permanently deletes the evicted federated data when it receives notifications from each node (the Cassandra node keeps track of which node a piece of data came from, so after receiving an eviction acknowledgment from NodeX the node can permanently delete the evicted NodeX data before receiving an eviction acknowledgment from NodeY). Until all machines/nodes have sent their notifications, the Cassandra node uses the cached evicted data in its queries if it receives a resultset from a machine/node that has not evicted its old data. For example, the Cassandra node has a local Table|Primary_Key|Column_ID datum that it has evicted, and meanwhile a local machine (which has not processed the eviction request) has not included the Table|Primary_Key|Column_ID datum in its resultset because it thinks that the Cassandra node already has the datum in its cache; the Cassandra node receives the resultset from the local machine, and because the local machine has not acknowledged the eviction request the Cassandra node includes the cached evicted datum in its own resultset.

这篇关于在分布式数据库汇总工作,优化网络带宽的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆