How to prepare mysql binlog from go

How to prepare mysql binlog from go  
 
My name is Artem, I work for Rambler Group in the Potok project as a Go lead developer.
 
We spent a lot of time taming mysql binlog. In this article, a story about how quickly and with a minimum of pitfalls to implement the mechanism of working with the bin on Go.
 
here or here .
 
 
The article has two parts:
 
 
1. How to quickly start the processing of records that come to the log.
 
2. How to tune and expand what's under the hood.
 
 


Part 1. We start as soon as possible.


 
To work with binlog we will use the library github.com/siddontang/go-mysql
 
We will connect to the new channel (for channel work is required. ROW format for binlog's ).
 
 
func binLogListener () {
c, err: = getDefaultCanal ()
if err == nil {
coords, err: = c.GetMasterPos ()
if err == nil {
c.SetEventHandler (& binlogHandler {})
c.RunFrom (coords)
}
}
}
func getDefaultCanal () (* canal.Canal, error) {
cfg: = canal.NewDefaultConfig ()
cfg.Addr = fmt.Sprintf ("% s:% d", "???.1", 3306)
cfg.User = "root"
cfg.Password = "root"
cfg.Flavor = "mysql"
cfg.Dump.ExecutionPath = ""
return canal.NewCanal (cfg)
}

 
Create a wrapper over the binomial:
 
 
    type binlogHandler struct {
canal.DummyEventHandler //Standard blank from the library
BinlogParser //Our custom helper for processing
}
func (h * binlogHandler) OnRow (e * canal.RowsEvent) error {return nil}
func (h * binlogHandler) String () string {return "binlogHandler"}

 
BinlogParser
 
 
We extend the logic of work with the received string of the binlog by adding logic to the OnRow () method.
 
 
    func (h * binlogHandler) OnRow (e * canal.RowsEvent) error {
var n int //the initial value is
var k int //step
switch e.Action {
case canal.DeleteAction:
return nil //outside the scope of example
case canal.UpdateAction:
n = 1
k = 2
case canal.InsertAction:
n = 0-
k = 1
}
for i: = n; i < len(e.Rows); i += k {
key: = e.Table.Schema + "." + e.Table.Name
switch key {
case User {}. SchemaName () + "." + User {}. TableName ():
/*
Disassembly of user data
* /
}
}
return nil
}

 
The essence of this wrapper is to disassemble the data that has come. Data comes to us from two records to update the line (the first line will contain the original data, the second - updated). Here we also consider the possibility of multi-performances and multi-updates. In this case, we will need to take every second record for UPDATE. For this, in the example above we have introduced n and k.
 
 
Let's make a model for getting data from binlog. In it we will read the data from the received lines. In the annotations we will indicate the names of the columns:
 
 
    type User struct {
Id int `gorm:" column: id "`
Name string `gorm:" column: name "`
Status string `gorm:" column: status "`
Created time.Time `gorm:" column: created "`
}
func (User) TableName () string {
return "User"
}
func (User) SchemaName () string {
return "Test"
}

 
Table structure in MYSQL:
 
 
    CREATE TABLE Test.User
(
) id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR (40) NULL,
status ENUM ("active", "deleted") DEFAULT "active",
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
)
ENGINE = InnoDB;

 
Let's parse the parsing itself - in the place of the procurement for parsing the data, add:
 
 
    user: = User {}
h.GetBinLogData (& user, e, i)

 
In fact, this is enough - we will have the data of a new record in the user model, but for clarity we will derive them:
 
 
    if e.Action == canal.UpdateAction {
oldUser: = User {}
h.GetBinLogData (& oldUser, e, i-1)
fmt.Printf ("User% d is updated from name% s to name% sn", user.Id, oldUser.Name, user.Name,)
} else {
fmt.Printf ("User% d is created with name% sn", user.Id, user.Name,)
}

 
The main thing that we were striving for is launching our "Hello binlog world":
 
 
    func main () {
go binLogListener ()
//here can be your code
time.Sleep (2 * time.Minute)
fmt.Print ("Thx for watching, goodbuy")
}

 
Then add and update the values:
 
 
    INSERT INTO Test.User (`id`,` name`) VALUE (? "Jack");
UPDATE Test.User SET name = "Jonh" WHERE id = 1;

 
We will see:
 
 
    User 1 is created with name Jack
User 1 name changed from Jack to Jonh

 
The resulting code works with binlog and parses the new lines. When we receive a record from the table we need, the code reads the data into the structure and outputs the result. Behind the scenes was the data parser (BinlogParser), which filled the model.
 
 

Part 2. As Kobs said, we need to level below


 
Consider the inner workings of the parser, which is based on reflection.
 
 
To fill the model with data, we used the handler method:
 
 
    h.GetBinLogData (& user, e, i)    

 
It parse simple data types:
 
    bool
int
float64
string
time.Time

 
and can parse complex structures from json.
 
 
If the supported types are not enough for you, or you just want to understand how the parsing works, you should add your own types.
 
 
First, let's look at how to fill in the data for the model field by examining the Id field of type int:
 
 
    type User struct {
Id int `gorm:" column: id "`
}

 
Through reflection, we get the type name. The parseTagSetting method converts annotations to a more convenient structure:
 
 
    element: = User {} //we usually have an interface on the input, but here we will immediately consider the model
v: = reflect.ValueOf (element)
s: = reflect.Indirect (v)
t: = s.Type ()
num: = t.NumField ()
parsedTag: = parseTagSetting (t.Field (k) .Tag)
if columnName, ok = parsedTag["COLUMN"]; ! ok || columnName == "COLUMN" {
continue
}
for k: = 0; k < num; k++ {
name: = s.Field (k) .Type (). Name ()
switch name {
case "int":
//here will parse the line
}
}

 
After obtaining the type int, you can set its value through the method of reflection:
 
 
    func (v Value) SetInt (x int64) {//  

 
Method for parsing annotations:
 
 
    func parseTagSetting (tags reflect.StructTag) map[string]string {
setting: = map[string]string {}
for _, str: = range[]string {tags.Get ("sql"), tags.Get ("gorm")} {
tags: = strings.Split (str, ";")
for _, value: = range tags {
v: = strings.Split (value, ":")
k: = strings.TrimSpace (strings.ToUpper (v[0]))
if len (v)> = 2 {
setting[k]= strings.Join (v[1:], ":")
} else {
setting[k]= k
}
}
}
return setting
}

 
On the input it takes int64. Let's make a method that will translate the obtained data from the binologist into int64:
 
 
    func (m * BinlogParser) intHelper (e * canal.RowsEvent, n int, columnName string) int64 {
columnId: = m.getBinlogIdByName (e, columnName)
if e.Table.Columns[columnId].Type! = Schema.TYPE_NUMBER {
panic ("Not int type")
}
switch e.Rows[n] [columnId]. (type) {
case int8:
return int64 (e.Rows[n].[columnId](int8))
case int32:
return int64 (e.Rows[n].[columnId](int32))
case int64:
return e.Rows[n] [columnId]. (int64)
case int:
return int64 (e.Rows[n].[columnId](int))
}
return 0
}

 
Everything looks logical, except for the getBinlogIdByName () method.
 
 
This trivial helper is needed to work with the names of the columns instead of their serial number, which allows:
 
 
  •  
  • take column names from gorm annotations;  
  • There is no need to make corrections when adding columns to the beginning or middle;  
  • It's more convenient to work with the name field than with column number 3.  

 
As a result, we add the handler:
 
 
    s.Field (k) .SetInt (m.intHelper (e, n, columnName))    

 
Consider two more examples of
 
ENUM: here values ​​come as index - that is, the status "active" will come as 1. Usually in most cases we need a string representation of enum. It can be obtained from the field description. When parsing enum values, it comes from ? but the array of possible values ​​starts from 0.
 
 
The Enum handler can look like this:
 
 
    func (m * BinlogParser) stringHelper (e * canal.RowsEvent, n int, columnName string) string {
columnId: = m.getBinlogIdByName (e, columnName)
if e.Table.Columns[columnId].Type == schema.TYPE_ENUM {
values: = e.Table.Columns[columnId].EnumValues ​​//values ​​of the fields
if len (values) == 0 || e.Rows[n] [columnId]== nil {{
return ""
}
return values ​​[e.Rows[n] [columnId]. (int64) -1]//the first value as a result corresponds to 0 value in the values ​​
}

 
I want to store JSON
 
 
A good idea, why not. JSON from the point of view of mysql is a string. We must somehow indicate that these data are serialized - for this we add to the gorm a noncanonical annotation "fromJson".
 
 
Imagine that this structure should be considered:
 
 
    type JsonData struct {
Int int `gorm:" column: int "`
StructData TestData `gorm:" column: struct_data; fromJson "`
MapData map[string]string `gorm:" column: map_data; fromJson "`
SliceData[]int `gorm:" column: slice_data; fromJson "`
}
type TestData struct {
Test string `json:" test "`
Int int `json:" int "`
}

 
It is possible to write many conditions and, probably, it will turn out. But every new type of data will kill all efforts. Although the attempt to find answers to stackoverflow - "how to lead and deserialize an unknown type of structure" begins with the phrase: "It's unclear why you need this, but try ".
 
 
By typing the required type to the interface, we can do this:
 
 
    if _, ok: = parsedTag["FROMJSON"]; ok {
newObject: = reflect.New (s.Field (k) .Type ()). Interface ()
json: = m.stringHelper (e, n, columnName)
jsoniter.Unmarshal ([]byte (json), & newObject)
s.Field (k) .Set (reflect.ValueOf (newObject) .Elem (). Convert (s.Field (k) .Type ()))
}

 
If you still have questions about the types of data, you can see tests or ask them in the comments.
 
 
What happened as a result of .
+ 0 -

Add comment