public sealed class MyRepository
{
public MyRepository(IUnitOfWork unitOfWork)
{
this.unitOfWork = unitOfWork;
}
IUnitOfWork unitOfWork = null;
//You also need to handle other parameters like 'sql', 'param' ect. This is out of scope of this answer.
public MyPoco Get()
{
return unitOfWork.Connection.Query(sql, param, unitOfWork.Transaction, .......);
}
public void Insert(MyPoco poco)
{
return unitOfWork.Connection.Execute(sql, param, unitOfWork.Transaction, .........);
}
}
然后你这样说:
交易:
using(DalSession dalSession = new DalSession())
{
UnitOfWork unitOfWork = dalSession.UnitOfWork;
unitOfWork.Begin();
try
{
//Your database code here
MyRepository myRepository = new MyRepository(unitOfWork);
myRepository.Insert(myPoco);
//You may create other repositories in similar way in same scope of UoW.
unitOfWork.Commit();
}
catch
{
unitOfWork.Rollback();
throw;
}
}
无交易:
using(DalSession dalSession = new DalSession())
{
//Your database code here
MyRepository myRepository = new MyRepository(dalSession.UnitOfWork);//UoW have no effect here as Begin() is not called.
myRepository.Insert(myPoco);
}
public class UnitOfWorkFactory<TConnection> : IUnitOfWorkFactory where TConnection : IDbConnection, new()
{
private string connectionString;
public UnitOfWorkFactory(string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new ArgumentNullException("connectionString cannot be null");
}
this.connectionString = connectionString;
}
public UnitOfWork Create()
{
return new UnitOfWork(CreateOpenConnection());
}
private IDbConnection CreateOpenConnection()
{
var conn = new TConnection();
conn.ConnectionString = connectionString;
try
{
if (conn.State != ConnectionState.Open)
{
conn.Open();
}
}
catch (Exception exception)
{
throw new Exception("An error occured while connecting to the database. See innerException for details.", exception);
}
return conn;
}
}
public class ProductRepository : IProductRepository
{
protected readonly IDbConnection connection;
protected readonly IDbTransaction transaction;
public ProductRepository(UnitOfWork unitOfWork)
{
connection = unitOfWork.Transaction.Connection;
transaction = unitOfWork.Transaction;
}
public Product Read(int id)
{
return connection.QuerySingleOrDefault<Product>("select * from dbo.Product where Id = @id", new { id }, transaction: Transaction);
}
}
要访问数据库,只需使用您选择的 IoC 容器实例化 DbContext或注入(我个人使用 .NET 核心提供的 IoC 容器)。
var unitOfWorkFactory = new UnitOfWorkFactory<SqlConnection>("your connection string");
var db = new DbContext(unitOfWorkFactory);
Product product = null;
try
{
product = db.Product.Read(1);
db.Commit();
}
catch (SqlException ex)
{
//log exception
db.Rollback();
}
对于这种简单的只读操作,明确需要 Commit()似乎有些过分,但随着系统的增长,这种需求会得到回报。而且显然,根据 Sam Saffron提供了一个小的性能优势。您“可以”在简单的读操作中省略 db.Commit(),通过这样做,您可以保持连接处于打开状态,并将清理工作的责任交给垃圾收集器。所以不建议这么做。
1. UserService CreateUserProfile
a. UserRepositoryGetByEmail("some@email.com")
b. UserRepository.Add(user)
c. AddressRepository.Add(new address)
2. UserService Commit?
// Business code. I'm going to write a method, but a class with dependencies is more realistic
static async Task MyBusinessCode(IUnitOfWorkContext context, EntityRepoitory repo)
{
var expectedEntity = new Entity {Id = null, Value = 10};
using (var uow = context.Create())
{
expectedEntity.Id = await repo.CreateAsync(expectedEntity.Value);
await uow.CommitAsync();
}
using (context.Create())
{
var entity = await repo.GetOrDefaultAsync(expectedEntity.Id.Value);
entity.Should().NotBeNull();
entity.Value.Should().Be(expectedEntity.Value);
}
}
工作单元只包装一个事务,并且是短期的:
public class UnitOfWork : IDisposable
{
private readonly SQLiteTransaction _transaction;
public SQLiteConnection Connection { get; }
public bool IsDisposed { get; private set; } = false;
public UnitOfWork(SQLiteConnection connection)
{
Connection = connection;
_transaction = Connection.BeginTransaction();
}
public async Task RollBackAsync()
{
await _transaction.RollbackAsync();
}
public async Task CommitAsync()
{
await _transaction.CommitAsync();
}
public void Dispose()
{
_transaction?.Dispose();
IsDisposed = true;
}
}
背景更有趣。这是回购协议和工作单元在幕后交流的方式。
有一个接口用于业务代码管理工作单元,另一个接口用于回购遵守该工作单元。
public class UnitOfWorkContext : IUnitOfWorkContext, IConnectionContext
{
private readonly SQLiteConnection _connection;
private UnitOfWork _unitOfWork;
private bool IsUnitOfWorkOpen => !(_unitOfWork == null || _unitOfWork.IsDisposed);
public UnitOfWorkContext(SQLiteConnection connection)
{
_connection = connection;
}
public SQLiteConnection GetConnection()
{
if (!IsUnitOfWorkOpen)
{
throw new InvalidOperationException(
"There is not current unit of work from which to get a connection. Call BeginTransaction first");
}
return _unitOfWork.Connection;
}
public UnitOfWork Create()
{
if (IsUnitOfWorkOpen)
{
throw new InvalidOperationException(
"Cannot begin a transaction before the unit of work from the last one is disposed");
}
_unitOfWork = new UnitOfWork(_connection);
return _unitOfWork;
}
}
public interface IConnectionContext
{
SQLiteConnection GetConnection();
}
public interface IUnitOfWorkContext
{
UnitOfWork Create();
}
回购是这样做的:
public class EntityRepository
{
private readonly IConnectionContext _context;
public EntityRepository(IConnectionContext context)
{
_context = context;
}
public async Task<int> CreateAsync(int value)
{
return await _context.GetConnection().QuerySingleAsync<int>(
@"
insert into Entity (Value) values (@value);
select last_insert_rowid();
", new { value });
}
public async Task<Entity> GetOrDefaultAsync(int id)
{
return await _context.GetConnection().QuerySingleOrDefaultAsync<Entity>(
@"
select * from Entity where Id = @id
", new { id });
}
}
public static void Main(string[] args)
{
using (var connection = new SQLiteConnection("Data Source=:memory:"))
{
connection.Open();
Setup(connection);
var context = new UnitOfWorkContextContext(connection);
var repo = new EntityRepository(context);
MyBusinessCode(repo, context).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
/// Register a single instance using whatever DI system you like.
/// </summary>
class ConnectionFactory
{
private string _connectionString;
public ConnectionFactory(string connectionString)
{
_connectionString = connectionString;
}
public IDbConnection CreateConnection()
{
return new SqlConnection(_connectionString);
}
}
/// <summary>
/// Generally, in a properly normalized database, your repos wouldn't map to a single table,
/// but be an aggregate of data from several tables.
/// </summary>
class ProductRepo
{
private ConnectionFactory _connectionFactory;
public ProductRepo(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public Product Get(int id)
{
// Allow connection pooling to worry about connection lifetime, that's its job.
using (var con = _connectionFactory.CreateConnection())
{
return con.Get<Product>(id);
}
}
// ...
}
class OrderRepo
{
// As above.
// ...
}
class ProductController : ControllerBase
{
private ProductRepo _productRepo;
private OrderRepo _orderRepo;
public ProductController(ProductRepo productRepo, OrderRepo orderRepo)
{
_productRepo = productRepo;
_orderRepo = orderRepo;
}
[HttpGet]
public Task<IAsyncResult> Get(int id)
{
// This establishes your transaction.
// Default isolation level is 'serializable' which is generally desirable and is configurable.
// Enable async flow option in case subordinate async code results in a thread continuation switch.
// If you don't need this transaction here, don't use it, or put it where it is needed.
using (var trn = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
Product product = _productRepo.Get(id);
// Use additional repositories and do something that actually requires an explicit transaction.
// A single SQL statement does not require a transaction on SQL Server due to default autocommit mode.
// ...
return Ok(product);
}
}
}
namespace Dapper.CQS.Example.Controllers
{
[ApiController]
[Route("[controller]/[action]")]
public class PropertyController : ControllerBase
{
private readonly IUnitOfWork _unitOfWork;
public PropertyController(IUnitOfWork unitOfWork)
{
_unitOfWork = unitOfWork;
}
[HttpGet]
public async Task<ActionResult<Property>> GetById([FromQuery] int id)
{
var property = await _unitOfWork.QueryAsync(new PropertyGetByIdQuery(id));
return property == null ? NoContent() : Ok(property);
}
[HttpGet]
public async Task<ActionResult<List<Property>>> Filter([FromQuery] string? name)
{
var properties = await _unitOfWork.QueryAsync(new PropertyFilterQuery(name));
return Ok(properties);
}
[HttpGet]
public async Task<ActionResult<PagedList<Property>>> PagedFilter([FromQuery] string? name, int page = 1, int pageSize = 5)
{
var properties = await _unitOfWork.QueryAsync(new PropertyPagedFilterQuery(name, page, pageSize));
return Ok(properties);
}
[HttpPost]
public async Task<ActionResult<Property>> Create([FromBody] Property property)
{
var createdId = await _unitOfWork.ExecuteAsync(new PropertyCreateCommand(property));
await _unitOfWork.CommitAsync();
property.Id = createdId;
return Ok(property);
}
[HttpDelete]
public async Task<ActionResult> Delete([FromQuery] int id)
{
await _unitOfWork.ExecuteAsync(new PropertyDeleteCommand(id));
await _unitOfWork.CommitAsync();
return Ok();
}
}
}
这里有一个问题:
namespace Dapper.CQS.Example.CommandQueries
{
public class PropertyPagedFilterQuery : QueryPagedBase<Property>
{
[Parameter]
public string? Name { get; set; }
protected override CommandType CommandType => CommandType.Text;
protected override string Procedure => @"
SELECT *, COUNT(*) OVER() [COUNT]
FROM Properties WHERE Name = @Name OR @Name IS NULL
ORDER BY [Name]
OFFSET (@page -1 ) * @pageSize ROWS
FETCH NEXT @pageSize ROWS ONLY
";
public PropertyPagedFilterQuery(string? name, int page, int pageSize)
{
Name = name;
Page = page;
PageSize = pageSize;
}
}
}
QueryBase 将使用 Dapper
public abstract class QueryPagedBase<T> : CommandQuery, IQuery<PagedList<T>>, IQueryAsync<PagedList<T>>
{
[Parameter]
public int Page { get; set; }
[Parameter]
public int PageSize { get; set; }
protected virtual string FieldCount => "COUNT";
public virtual PagedList<T> Query(IDbConnection connection, IDbTransaction? transaction)
{
var result = connection.Query<T, int, (T Item, int Count)>(Procedure, (a, b) => (a, b), GetParams(), transaction, commandType: CommandType, splitOn: FieldCount);
return ToPagedList(result);
}
public virtual async Task<PagedList<T>?> QueryAsync(IDbConnection connection, IDbTransaction? transaction, CancellationToken cancellationToken = default)
{
var result = await connection.QueryAsync<T, int, (T Item, int Count)>(Procedure, (a, b) => (a, b), GetParams(), transaction, commandType: CommandType, splitOn: FieldCount);
return ToPagedList(result!);
}
private PagedList<T> ToPagedList(IEnumerable<(T Item, int Count)> result)
{
return new PagedList<T>
{
PageSize = PageSize,
Page = Page,
TotalRecords = result.Select(t => t.Count).FirstOrDefault(),
Items = result.Select(t => t.Item).ToList()
};
}
}
public interface IUnitOfWork
{
Task SaveChangesAsync(CancellationToken cancellationToken);
}
public interface IRepository<TEntity, in TId> where TEntity : EntityBase<TId> where TId : IComparable<TId>
{
Task<TEntity> GetByIdAsync(TId id, CancellationToken cancellationToken);
TEntity Add(TEntity entity);
void Update(TEntity entity);
void Remove(TEntity entity);
}
域名模型:
public abstract class EntityBase<TId> where TId : IComparable<TId>
{
public TId Id { get; }
protected EntityBase()
{
}
protected EntityBase(TId id)
{
Id = id;
}
}
public class WeatherForecast : EntityBase<int>
{
// ...
}
internal class AppUnitOfWork : IAppUnitOfWork, IDisposable
{
private readonly IDbConnection _connection;
private IDbTransaction _transaction;
public IWeatherForecastsRepository WeatherForecasts { get; private set; }
// Example for using in ASP.NET Core
// IAppUnitOfWork should be registered as scoped in DI container
public AppUnitOfWork(IConfiguration configuration)
{
// I was using MySql in my project, the connection will be different for different DBMS
_connection = new MySqlConnection(configuration["ConnectionStrings:MySql"]);
_connection.Open();
_transaction = _connection.BeginTransaction();
WeatherForecasts = new WeatherForecastsRepository(_connection, _transaction);
}
public Task SaveChangesAsync(CancellationToken cancellationToken)
{
try
{
_transaction.Commit();
}
catch
{
_transaction.Rollback();
throw;
}
finally
{
_transaction.Dispose();
_transaction = _connection.BeginTransaction();
WeatherForecasts = new WeatherForecastsRepository(_connection, _transaction);
}
return Task.CompletedTask;
}
public void Dispose()
{
_transaction.Dispose();
_connection.Dispose();
}
}
很简单。但是当我试图实现特定的存储库接口时,我遇到了一个问题。我的域模型很丰富(没有公共设置器,一些属性包装在值对象中等等)。Dapper 无法处理这样的类。它不知道如何将值对象映射到 db 列,当您尝试从 db 中选择某个值时,它会抛出错误并表示无法实例化实体对象。一种选择是创建私有构造函数,其参数与数据库列名称和类型相匹配,但这是一个非常糟糕的决策,因为域层不应该了解数据库的任何信息。