Distributed Transaction Framework: Seata-Golang Communication Model

By Liu Xiaomin and Yu Yu

1. Overview

When I was practicing microservices in my company in 2018, the biggest problem I encountered was the distributed transaction. In the same year, Alibaba opened the source code of distributed transaction solution to the public. I noticed this project soon after. It was initially called Fescar but later changed its name to Seata. I was very interested in open-source technologies, and I joined many community groups. At that time, I followed the dubbo-go project and observed silently. After learning more about Seata, I gradually came up with the idea of developing a distributed transaction framework based on Golang.

To implement a distributed transaction framework through Golang, the first issue is implementing RPC communication. The dubbo-go is a good example, so I began to study getty, the underlying layer of dubbo-go.

2. Implement RPC Communication Based on getty

The following section describes the RPC communication process of Seata-Golang based on relevant codes.

2.1. Establish Connections

func (c *client) connect() {
var (
err error
ss Session
)
for {
// Establish a session connection.
ss = c.dial()
if ss == nil {
// client has been closed
break
}
err = c.newSession(ss)
if err == nil {
// Send and receive messages.
ss.(*session).run()
// Part of the codes are omitted here.

break
}
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
}
}

The connect() method gets a session connection through the dial() method. The dial() method is next.

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
case UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}
return nil
}

The TCP connection is the main focus, so next step is the c.dialTCP() method.

func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
)
for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
// Establish an encrypted connection.
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// Establish a TCP connection.
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// Return a TCPSession.
return newTCPSession(conn, c)
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

Now we know how to use getty to establish a TCP connection and return TCPSession.

2.2. Send and Receive Messages

func (s *session) run() {
// Omit part of the codes.

go s.handleLoop()
go s.handlePackage()
}

Here are two goroutines, called handleLoop and handlePackage. The literal meaning is consistent with our conjecture. The handleLoop() method is next.

func (s *session) handleLoop() {
// Omit part of the codes.

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
// Omit part of the codes.

case outPkg, ok = <-s.wQ:
// Omit part of the codes.
iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// The outPkg of interface{} type is encoded into binary bits through s.writer.
pkgBytes, err = s.writer.Write(s, outPkg)
// Omit part of the codes.

iovec = append(iovec, pkgBytes)
// Omit part of the codes.
}
// Send these binary bits.
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// Scheduled execution logic, heartbeats, and so on.
s.listener.OnCron(s)
}
}
}
}

Through the codes above, we can easily find that the handleLoop() method processes the logic of sending messages. The messages to be sent by RPC are first encoded as binary bits through s.writer. Then, they are sent through the established TCP connection. The Writer interface corresponding to s.writer is an interface that the RPC framework must implement.

The handlePackage() method is listed below:

func (s *session) handlePackage() {
// Omit part of the codes.
if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}
err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

Then, the handleTCPPackage() method

func (s *session) handleTCPPackage() error {
// Omit part of the codes.
conn = s.Connection.(*gettyTCPConn)
for {
// Omit part of the codes.
bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// Receive messages from the TCP connection.
bufLen, err = conn.recv(buf)
// Omit part of the codes.

break
}
// Omit part of the codes.

// Write the received message in binary bits to pkgBuf.
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
break
}
// Decode the received message into an RPC message by using s.reader.
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// Omit part of the codes.
s.UpdateActive()
// Place received messages in the TaskQueue for consumptions by RPC consumers.
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
if exit {
break
}
}
return perrors.WithStack(err)
}

According to the codes above, the RPC consumer needs to decode the binary bit message received from the TCP connection into one that can be consumed by RPC. This process is completed by s.reader. Therefore, we need to build the RPC communication layer and implement the Reader interface corresponding to s.reader.

2.3. Decouple the Underlying Processing Logic of Network Messages from the Business Logic

At the end part of the handlePackage() method, we can see that the received messages are put into the s.addTask(pkg) method. Let's go back for analysis:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

The pkg parameter is passed to an anonymous method, which is put into the taskPool at last. This method is very important. When I wrote the Seata-Golang code, I dug myself into a hole. I will analyze this later.

Now, let’s see the definition of taskPool.

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

A channel sem with a buffer size of “size” (runtime.NumCPU() * 100 by default) is created. Then, the following is the AddTaskAlways(t task) method:

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}
select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}

First, the added tasks are consumed by the "len(p.sem)" of goroutines. If no goroutine is available, a temporary goroutine is started to run t(). It is equivalent to having "len(p.sem)" of goroutines to form the goroutine pool. The goroutines in the pool process the business logic instead of the goroutines responsible for network message processing. Thus, business logic is decoupled from network message processing. When I dug myself into a hole, I forgot to set the taskPool. As a result, the goroutine for processing business logic and the one for underlying network message logic are the same. When I was waiting for a task to be completed in the business logic, the entire goroutines were blocked. No message could be received during the blocking period.

2.4. Specific Implementation

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}
// ReadWriter interface use for handle application packages
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error
// invoked when session closed.
OnClose(Session)
// invoked when got error.
OnError(Session, error)
// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)
// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

The RPC communication can be achieved easily by analyzing the entire getty code. We only need to implement ReadWriter to encode and decode RPC messages. Then, we implement EventListener to process the specific logic corresponding to RPC messages. Next, we inject the ReadWriter and EventLister implementations to the RPC Client and Server.

2.4.1 Implementation of Encoding and Decoding Protocol

The Seata protocol is defined below:

In the RpcPackageHandler implementation of the ReadWriter interface, the Codec method is called to encode and decode the message body according to the preceding format.

// Messages are encoded as binary bits.
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}
// Binary bits are decoded as the message body.
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

2.4.2 Implementation on the Client

The RpcRemotingClient implementation of EventListener on the client side is listed below:

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
ApplicationId: client.conf.ApplicationId,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
// After the connection is established, a request for registering the TransactionManager is sent to the Transaction Coordinator.
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// Store the connection to the Transaction Coordinator in the connection pool for later use.
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()
return nil
}
// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}
// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}
// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}
if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// Process transactional messages, such as submitting or rollback.
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}
// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// Send heartbeats.
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

The logic of clientSessionManager.RegisterGettySession(session) will be analyzed below.

2.4.3 Transaction Coordinator Implementation on the Server

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
return nil
}
func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// Release the TCP connection.
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......", session.Stat())
}
func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// Establish the mapping relationship between the TransactionManager information and the TCP connection.
coordinator.OnRegTmMessage(rpcMessage, session)
return
}
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
return
}
if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// Establish the mapping relationship between the ResourceManager information and the TCP connection.
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// Process transactional messages, such as global transaction registration, branch transaction registration, branch transaction submission, and global transaction rollback.
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("close a unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}
func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {}

The coordinator.OnRegTmMessage(rpcMessage, session) registers Transaction Manager, while the coordinator.OnRegRmMessage(rpcMessage, session) registers Resource Manager. The specific logic is analyzed in the following. When messages go to the coordinator.OnTrxMessage(rpcMessage, session) method, they are routed to a specific logic according to their type codes:

switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus:
req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport:
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit:
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback:
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister:
req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport:
req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default:
return nil
}

2.4.4 Session Manager Analysis

After the Client establishes a connection with the Transaction Coordinator, it stores the connection in the map of "serverSessions = sync.Map{}" through clientSessionManager.RegisterGettySession(session). The key of the map is the RemoteAddress, which is obtained from the session. RemoteAddress is the address of the Transaction Coordinator, and its value is session. Thus, the Client can register the Transaction Manager and Resource Manager in the Transaction Coordinator through a session of the map. For specific codes, please see getty_client_session_manager.go.

After the registration, a connection may be used to send TM messages or RM messages. We use RpcContext to identify the connection information.

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.Set
Session getty.Session
}

When a transactional message is received, we need to construct an RpcContext for the subsequent transaction processing logic. Therefore, we construct the following map to cache the mapping relationships.

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered,
// it will be the RM registered
session_transactionroles = sync.Map{}
// session -> applicationId
identified_sessions = sync.Map{}
// applicationId -> ip -> port -> session
client_sessions = sync.Map{}
// applicationId -> resourceIds
client_resources = sync.Map{}
)

By doing so, the Transaction Manager and the Resource Manager are registered in the Transaction Coordinator through coordinator.OnRegTmMessage(rpcMessage, session) and coordinator.OnRegRmMessage(rpcMessage, session), respectively. During this period, the relationships among applicationId, ip, port, and session are cached in the client_sessions map, and the relationship between applicationId and resourceIds is cached in the client_resources map. An application may have multiple Resource Managers. When necessary, we can use the mapping relationships above to construct an RpcContext. The implementation of this part is very different from that of the Java-based Seata.

Now, we have finished the analysis of the mechanism of the entire Seata-Golang RPC communication model.

3. The Future of Seata-Golang

In the future, there will be more research and progress, such as the support for the registry and the configuration center and the protocol interconnection with the Java-based Seata 1.4. The support for other databases and the implementation of the raft transaction coordinator are also involved. We hope developers interested in the distributed transaction problem will join us to build a sound Golang distributed transaction framework.

References

About the Authors

Yu Yu (Github ID: @AlexStocks) is the Director of the dubbo-go project and community and a programmer with more than ten years of front-line work experience in server-side infrastructure research and development. Yu Yu has participated in improving Muduo, Pika, Dubbo, Sentinel-go, and other well-known projects. Currently, Yu Yu is engaged in container orchestration and service mesh in the trusted-native department of Ant Financial.

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.