CQRS là gì?

CQRS (viết tắt của cụm Command/Query Responsibility Segregation) định nghĩa sơ khai là cách phân tách trách nhiệm truy vấn lệnh, nhằm tối ưu hóa việc ghi vào & đọc từ cơ sở dữ liệu.

Motivation

Ngày càng nhiều lượt đọc và ghi phức tạp trong những hệ thống lớn, dẫn đến lỗi, tắc nghẽn service.

Việc ứng dụng CQRS sẽ giải quyết:

  • Tranh chấp dữ liệu
  • Quản lý quyền và bảo mật phức tạp khi cùng một object có cả quyền đọc và ghi

Ngoài ra còn có những lợi ích:

  • Chia tải tùy nhu cầu, ví dụ, ứng dụng bạn sẽ cần đọc nhiều hơn ghi.
  • Chia làm nhiều loại view cho query khác nhau tùy use case.

Cách hoạt động

CQRS thường đi chung với Event Sourcing. Trong ngữ cảnh của pattern này:

  • Model cho các lệnh ghi (INSERT/UPDATE/DELTE/…) sẽ là source cho model của các lệnh đọc.
  • Model cho các lệnh đọc sẽ là các view hiện thực dữ liệu.

Thật ra, chỉ implement CQRS sẽ rất đơn giản, việc tích hợp Event Sourcing vào để mở rộnng khả năng xử lý mới làm bài toán trở nên phức tạp. Vậy nên, phải bắt đầu đi từ Event Sourcing trước.

Event sourcing là gì?

Trong Event sourcing, ta chứa các event trong DB, không lưu những trạng thái bị lost của các đối tượng. Event là những thứ đã xảy ra trong quá khứ. Trạng thái hiện tại của một thực thể, bạn phải apply tất cả event vô một object mới. Nó được gọi là Rehydration (thẩm thấu lại) hoặc Event replay.

Khi dùng event sourcing, bạn có thể xây dựng:

  • Trace of events – cho từng đối tượng để xem chính xác điều gì đã xảy ra cho đối tượng đó,
  • Audit trail – để biết ai và cái gì đã trigger lên nó.
  • Dùng ngôn ngữ business trong event sourcing, và cuối cùng là
  • Event replay – khi có bug xày ra trong quá trình xử lý event đó, ta có thể fix issue đó và lần sau ta replay lại event.

Giờ mình sẽ map diagram vào thư mục phía dưới để dễ hình dung:

Ta đi từ diagram, mỗi Object trong hình sẽ có 1 danh sách Events đi theo và Handler cho từng loại event. Các Object này nằm trong thư mục Domain\Entities. Mỗi object đều phải có cấu trúc như nhau để chứa danh sách event và các handler tương ứng, mình đặt nó trong class tên là EventSourced, một implement của IEventSourced.

public interface IEventSourced
{
    Guid Id { get; }
    int Version { get; }
    IEnumerable<VersionedEvent> PendingEvents { get; }
}

Id là định danh của object. Version là giá trị tăng dần cho biết số lần object được cập nhật. PendingEvents chứa danh sách sự kiện làm thay đổi đối tượng.

Tiếp theo là pseudocode cho Event Sourced:

public abstract class EventSourced : IEventSourced
{
	private readonly Dictionary<Type, Action<VersionedEvent>> _handlers = new Dictionary<Type, Action<VersionedEvent>>();
	private readonly List<VersionedEvent> _pendingEvents = new List<VersionedEvent>();

	protected EventSourced(Guid id)
	{
		Id = id;
	}

	public Guid Id { get; }
	public int Version { get; protected set; } = -1;
	public IEnumerable<VersionedEvent> PendingEvents => _pendingEvents;
	protected void Handles<TEvent>(Action<TEvent> handler) where TEvent : VersionedEvent { }
	protected void LoadFrom(IEnumerable<VersionedEvent> pastEvents){ }
	protected void Update(VersionedEvent e){ }
}

Không giải thích lại ba thuộc tính đã nói trong IEventSourced: Id, Version, và PendingEvents.

Tiếp tục, ta có _handlers dùng để chứa những hành động tương ứng cho ứng kiểu Event khác nhau. Danh sách phần tử trong _handlers sẽ được thêm vào ở hàm constructor thông qua lệnh gọi đến Handles<Tênsựkiện>(Hànhđộngtươngứng). và sẽ được Invoke mỗi khi đối tượng được update trong thông qua lệnh gọi Update(). Ngoài ra trong class này còn chứa hàm LoadFrom() cho phép lấy danh sách sự kiện trong quá khứ của một đối tượng.

Tiếp theo là nội dung một class chứa đối tượng chính của chúng ta (ta gọi là đối tượng Event Sourced – dịch nôm na theo tiếng Việt, là nguồn gốc của những sự kiện), ở đây mình dùng code của đối tượng Booking để giải thích cách implement:

public class Booking : EventSourced
{
	private Booking(Guid id) : base(id)
	{
		Handles<TourBooked>(OnTourBooked);
		Handles<BookingCanceled>(OnBookingCanceled);
	}

	public Booking(Guid id, int tourId, string name, string email, bool transport) : this(id)
	{
		Update(new TourBooked
		{ 
			TourId = tourId, 
			Name = name, 
			Email = email, 
			Transport = transport 
		});
	}

	public Booking(Guid id, IEnumerable<VersionedEvent> history) : this(id)
	{
		LoadFrom(history);
	}

	public void Cancel(string reason)
	{
		Update(new BookingCanceled 
		{ 
			Email = Email, 
			Name = Name, 
			TourId = TourId, 
			Reason = reason });
	}

	private void OnTourBooked(TourBooked tourBooked)
	{
		TourId = tourBooked.TourId;
		Name = tourBooked.Name;
		Email = tourBooked.Email;
		Transport = tourBooked.Transport;
	}

	private void OnBookingCanceled(BookingCanceled bookingCanceled)
	{
		IsCanceled = true;
		CancellationReason = bookingCanceled.Reason;
	}

	public string Name { get; private set; }
	public string Email { get; private set; }
	public bool Transport { get; private set; }
	public int TourId { get; private set; }
	public bool IsCanceled { get; private set; }
	public string CancellationReason { get; private set; }
}

Trong đây có ba phương thức khởi tạo nhưng tập trung support 2 behavior chính: Khởi tạo mới đối tượng Booking và load lại một đối tượng Booking đã có sẵn.

  • Khởi tạo mới đối tượng (xem dòng số 9): để làm điều này bước đầu tiên để ý sẽ thấy bạn phải đưa danh sách những handlers tương ứng cho các sự kiện trong Booking, sau đó là gọi hàm Update() để thêm sự kiện đầu tiên cho nó (ở đây là sự kiên TourBooked).
  • Load danh sách event đã có (xem dòng số 20): trước tiên bạn cũng phải đưa danh sách Handler tương ứng cho các sự kiện, sau đó gọi hàm LoadFrom() (hàm này đã được implement ở base class EventSourced).

Tiếp theo là đến các Repositories.

Để hiểu về cách repo hoạt động. Ta phải biết khi nào nó được gọi. Trong ngữ cảnh của Event sourcing, khi có nhu cầu cập nhật từ UI, ứng dụng đầu tiên sẽ khởi tạo đối tượng EventSourced (có thể từ đầu hoặc từ 1 danh sách event trong quá khứ) sau đó gửi đối này đến repository để thực thi các bước tiếp theo, cụ thể là, ví dụ: AbcRepository.Save().

Đi vào class IEventSourcedRepository trước

public interface IEventSourcedRepository<T> where T : IEventSourced
{
	T Get(Guid id);
	void Save(T eventSourced);
}

Ta có 2 phương thức Get và Save đại diện cho 2 loại thao tác xuống DB: Read và Write. Lưu ý ở đây là Repo này sẽ chỉ tương tác với đối tượng của IEventSourced chứ không tương tác với những loại Data Model khác.

Tiếp đến sẽ xem một repo sẽ được implement như thế nào, mình tập trung vào pseudocode để bạn hình dung được cách nó hoạt động trước nhé:

public class EventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced
{
	private string _connectionString;
	private JsonSerializerSettings _jsonSettings;
	private IEnumerable<IEventHandler> _eventHandlers;

	public EventSourcedRepository(IEnumerable<IEventHandler> eventHandlers)
	{
		_connectionString = "Data Source=AppData/EventSourcing-database.db;";
		_jsonSettings = new JsonSerializerSettings
		{
			TypeNameHandling = TypeNameHandling.All,
			TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple
		};

		_eventHandlers = eventHandlers;
	}

	public T Get(Guid id) { }

	public void Save(T eventSourced) { }

	private EventRecord Serialize(VersionedEvent e) { }

	private VersionedEvent Deserialize(EventRecord e) { }
}

_connectionString dùng để trỏ đến DB. _jsonSettings qui định cách parse dữ liệu giữa string & object trước khi Serialize và Deserialize. _eventHandlers là danh sách được inject khi bắt đầu ứng dụng, chi tiết bên trong từng handler sẽ được giải thích sau.

Tiếp theo là phương thức Get(Guid): Nhiệm vụ của hàm này là trả về đối tượng EventSourced có Id và danh sách Events.

Kế tiếp là phương thức Save(T eventSourced): phương thức này nhận vào là một đối tượng eventSourced nên nhiệm vụ có nó là store lại những event vô database sau đó tìm handler tương ứng

Cách cài đặt CQRS và Event Sourcing

Figure-3

Trong diagram này, ta cần biết nhiệm vụ của các thành phần:

  • Command Queue sẽ lưu Command vào bảng Logs
  • Command Subscriber sẽ lắng nghe Command Queue và chịu trách nhiệm tạo ra aggregate và gọi đến những phương thức khi aggregate được thực thi. Command Subscriber cũng lưu aggregate dưới dạng stream vào bảng Logs. Đồng thời sẽ publish event lên Event Queue.
  • Các event này sẽ được xử lý đồng thời bởi Event subcriberProcess Manager.
  • Process Manager sẽ gửi command đến Command Queue để trả lời lại event.
  • Còn Event Subcriber tạo và cập nhật tham chiếu lên Query store.
  • Ngoài ra, khái niệm Query search là một lớp data access dạng lightweight để đọc từ các tham chiếu
  1. Cấu trúc thư mục cho phần CQRS sẽ như sau:

Ta sẽ chia rõ ràng Domain ra làm hai thư mục: ReadModel và WriteModel. Mỗi thư mục sẽ chứa những model dùng trong Query và Command.

Trong thư mục Data Access ta cũng chia 2 thư mục con tương tự để chứa Repository cho việc read và write như sau. Lưu ý: tất cả repository trong trong DataAccess\ReadModel chỉ tương tác với các model bên trong Domain\ReadModel và tương tự; tất cả repository trong trong DataAccess\WriteModel chỉ tương tác với các model bên trong Domain\WriteModel và tương tự.

Đây là cấu trúc thư mục để cài đặt riêng cho Event Sourcing:

TBD

1 thought on “CQRS là gì?”

Leave a Reply

Related Post