如何解决 MongoDB 中缺少事务的问题?

我知道这里有类似的问题,但是如果我需要事务或者使用 原子操作或者 两阶段提交,它们可以是 告诉我切换回常规的 RDBMS 系统。第二种解决方案似乎是最佳选择。第三个我不希望遵循,因为它似乎很多事情可能会出错,我不能测试它的每一个方面。我很难重构我的项目来执行原子操作。我不知道这是出于我有限的观点(到目前为止我只处理过 SQL 数据库) ,还是实际上不可能做到。

我们想在我们公司试点测试 MongoDB。我们选择了一个相对简单的项目-短信网关。它允许我们的软件向蜂窝网络发送短信息,而网关则完成这些繁琐的工作: 实际上通过不同的通信协议与提供商进行通信。网关还管理消息的计费。每个申请这项服务的顾客都必须购买一些积分。当发送消息时,系统会自动减少用户的余额,并在余额不足时拒绝访问。此外,因为我们是第三方短信服务提供商的客户,我们也可能有我们自己的余额与他们。我们还要记录这些。

我开始考虑如何用 MongoDB 存储所需的数据,如果我降低一些复杂性(外部计费、排队发送短信)。来自 SQL 世界的我将为用户创建一个单独的表,另一个用于 SMS 消息,还有一个用于存储关于用户余额的事务。假设我为 MongoDB 中的所有这些集合创建了单独的集合。

设想一个短信发送任务,在这个简化的系统中有以下步骤:

  1. 检查用户是否有足够的余额; 如果没有足够的信用,则拒绝访问

  2. 在 SMS 集合中发送和存储带有详细信息和成本的消息(在活动系统中,消息将具有 status属性,任务将接收它并根据其当前状态设置 SMS 的价格)

  3. 通过发送消息的成本来减少用户的余额

  4. 在事务集合中记录事务

这有什么问题吗?MongoDB 只能对一个文档进行原子更新。在前面的流程中,可能会发生某种错误,消息被存储在数据库中,但是用户的余额没有更新,并且/或者事务没有被记录。

我有两个主意:

  • 为用户创建单个集合,并将余额存储为字段、用户相关事务和消息作为子文档存储在用户文档中。因为我们可以自动更新文档,这实际上解决了事务问题。缺点: 如果用户发送许多 SMS 消息,文档的大小可能会变大,并且可能达到4MB 的文档限制。也许我可以在这种情况下创建历史文档,但我不认为这是一个好主意。我也不知道如果我把越来越多的数据推到同一个大文档中,系统会有多快。

  • 为用户创建一个集合,为事务创建一个集合。可以有两种交易: 正余额变化的 信用购买和负余额变化的 信息已发送。事务可能有一个子文档; 例如,在 信息已发送中,SMS 的详细信息可以嵌入到事务中。缺点: 我不存储当前的用户余额,所以每次用户尝试发送消息时,我都必须计算余额,以告诉用户该消息是否可以通过。我担心随着存储事务数量的增长,这种计算可能会变慢。

我有点不知道该选哪种方法。还有别的解决办法吗?我在网上找不到任何关于如何解决这类问题的最佳实践。我猜想,许多试图熟悉 NoSQL 世界的程序员一开始都面临着类似的问题。

80776 次浏览

该项目是简单的,但你必须支持交易付款,这使整个事情困难。因此,举例来说,一个复杂的门户系统有数百个集合(论坛,聊天,广告,等等..。.)在某些方面更简单,因为如果你失去了一个论坛或聊天条目,没有人真正关心。另一方面,如果你失去了一个支付交易,这是一个严重的问题。

因此,如果你真的想要一个 MongoDB 的试点项目,选择一个在 那个方面比较简单的项目。

说重点: 如果事务完整性是 必须的,那么不要使用 MongoDB,而只在支持事务的系统中使用组件。要在组件之上构建一些东西来为非 ACID 兼容的组件提供类似于 ACID 的功能是非常困难的。根据不同的用例,以某种方式将操作分为事务操作和非事务操作可能是有意义的。

由于合理的原因,MongoDB 中没有事务。这是使 MongoDB 更快的原因之一。

在您的情况下,如果交易是必须的,那么 mongo 似乎不太合适。

可能是 RDMBS + MongoDB,但这将增加复杂性,并使其更难以管理和支持应用程序。

这有什么问题吗?MongoDB 只能对一个文档进行原子更新。在前面的流程中,可能会发生某种错误,消息被存储在数据库中,但是用户的余额没有减少,并且/或者事务没有被记录。

这不是问题。您提到的错误要么是逻辑错误(bug) ,要么是 IO 错误(网络、磁盘故障)。这种错误会使无事务存储和事务存储处于不一致的状态。例如,如果它已经发送了短信,但在存储消息时发生了错误-它不能回滚短信发送,这意味着它不会被记录,用户平衡不会减少等。

这里真正的问题是用户可以利用竞态条件并发送比平衡允许的更多的消息。这也适用于 RDBMS,除非您使用带有平衡字段锁定的 SMS 发送内部事务(这将是一个很大的瓶颈)。MongoDB 的一个可能的解决方案是首先使用 findAndModify来减少余额并检查余额,如果余额为负,则禁止发送并退还余额(原子增量)。如果是肯定的,继续发送,如果它不能退还金额。还可以维护余额历史记录集,以帮助修复/验证余额字段。

看看 这个,德库泰克的作品。他们为 Mongo 开发了一个插件,该插件不仅承诺处理事务,还承诺提高性能。

这可能是我发现的最好的博客,关于如何为 mongodb 实现类似事务的特性!

同步标志: 最适合从主文档复制数据

工作排队: 非常通用,解决95% 的案件。无论如何,大多数系统需要至少有一个作业队列!

两阶段提交: 这种技术确保每个实体始终拥有达到一致状态所需的所有信息

日志协调: 最强大的技术,理想的金融系统

版本控制: 提供隔离并支持复杂的结构

阅读更多信息: https://dzone.com/articles/how-implement-robust-and

从4.0开始,MongoDB 将拥有多文档 ACID 事务。计划是首先启用复制集中的部署,然后是分片集群。MongoDB 中的事务就像关系数据库中的事务开发人员所熟悉的那样——它们是多语句的,具有类似的语义和语法(如 start_transactioncommit_transaction)。重要的是,启用事务的 MongoDB 更改不会影响不需要它们的工作负载的性能。

有关详细信息,请参阅 给你

拥有分布式事务并不意味着您应该像在表关系数据库中那样对数据建模。接受文档模型的力量,并遵循数据建模的良好和推荐的 练习

现在说这个有点晚了,不过我想以后会有帮助的。我使用 雷迪斯制作一个 排队来解决这个问题。

  • 要求:
    下面的图片显示了两个动作需要同时执行,但是动作1的第二阶段和第三阶段需要在动作2的第二阶段或相反的阶段开始之前完成(一个阶段可以是请求 REST api、数据库请求或执行 javascript 代码...)。 enter image description here

  • 排队对你有什么帮助
    队列确保许多函数中 lock()release()之间的每个块代码不会同时运行,使它们相互隔离。

    function action1() {
    phase1();
    queue.lock("action_domain");
    phase2();
    phase3();
    queue.release("action_domain");
    }
    
    
    function action2() {
    phase1();
    queue.lock("action_domain");
    phase2();
    queue.release("action_domain");
    }
    
  • 如何建立队列
    我将只关注如何避免 种族条件部分时,建立一个后端站点队列。如果你不知道排队的基本概念,来 给你
    下面的代码只显示了概念,您需要以正确的方式实现。

    function lock() {
    if(isRunning()) {
    addIsolateCodeToQueue(); //use callback, delegate, function pointer... depend on your language
    } else {
    setStateToRunning();
    pickOneAndExecute();
    }
    }
    
    
    function release() {
    setStateToRelease();
    pickOneAndExecute();
    }
    

但是你需要 isRunning() setStateToRelease() setStateToRunning()分离它的自我,否则你再次面临竞争条件。为此,我选择 Redis 作为 的目的和可伸缩性。
Redis 文件谈到了它的交易:

事务中的所有命令都被序列化并执行 不可能发生另一个人发出的请求 客户是在执行 Redis 的过程中提供服务的 这保证了命令是作为 单独作业单独作业。

P/s:
我使用 Redis 是因为我的服务已经在使用它了,您可以使用任何其他支持隔离的方式来做到这一点。
我的代码中的 action_domain是上面的,当你只需要用户 A 的动作1调用时阻止用户 A 的动作2,不要阻止其他用户。这个想法是放置一个独特的钥匙为每个用户锁。

事务现在可以在 MongoDB 4.0中找到

// Runs the txnFunc and retries if TransientTransactionError encountered


function runTransactionWithRetry(txnFunc, session) {
while (true) {
try {
txnFunc(session);  // performs transaction
break;
} catch (error) {
// If transient error, retry the whole transaction
if ( error.hasOwnProperty("errorLabels") && error.errorLabels.includes("TransientTransactionError")  ) {
print("TransientTransactionError, retrying transaction ...");
continue;
} else {
throw error;
}
}
}
}


// Retries commit if UnknownTransactionCommitResult encountered


function commitWithRetry(session) {
while (true) {
try {
session.commitTransaction(); // Uses write concern set at transaction start.
print("Transaction committed.");
break;
} catch (error) {
// Can retry commit
if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
print("Error during commit ...");
throw error;
}
}
}
}


// Updates two collections in a transactions


function updateEmployeeInfo(session) {
employeesCollection = session.getDatabase("hr").employees;
eventsCollection = session.getDatabase("reporting").events;


session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );


try{
employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } );
eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } );
} catch (error) {
print("Caught exception during transaction, aborting.");
session.abortTransaction();
throw error;
}


commitWithRetry(session);
}


// Start a session.
session = db.getMongo().startSession( { mode: "primary" } );


try{
runTransactionWithRetry(updateEmployeeInfo, session);
} catch (error) {
// Do something with error
} finally {
session.endSession();
}