intial commit
This commit is contained in:
156
internal/backup/mysql_to_parquet.go
Normal file
156
internal/backup/mysql_to_parquet.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/apache/arrow/go/v14/arrow"
|
||||
"github.com/apache/arrow/go/v14/arrow/array"
|
||||
"github.com/apache/arrow/go/v14/arrow/memory"
|
||||
"github.com/apache/arrow/go/v14/parquet"
|
||||
"github.com/apache/arrow/go/v14/parquet/compress"
|
||||
"github.com/apache/arrow/go/v14/parquet/pqarrow"
|
||||
)
|
||||
|
||||
func BackupDeliveryLogs(sqlDB *sql.DB) (string, error) {
|
||||
|
||||
rows, err := sqlDB.Query(`
|
||||
SELECT
|
||||
logid, logdate, deliveryid, orderheaderid,
|
||||
tenantid, locationid, userid, partnerid,
|
||||
orderid, orderstatus, latitude, longitude,
|
||||
logstatus, created, updated, firstname, lastname
|
||||
FROM deliverylogs
|
||||
WHERE logdate >= NOW() - INTERVAL '3 months'
|
||||
`)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Arrow schema (exact match with MySQL table)
|
||||
schema := arrow.NewSchema([]arrow.Field{
|
||||
{Name: "logid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "logdate", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true},
|
||||
{Name: "deliveryid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "orderheaderid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "tenantid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "locationid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "userid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "partnerid", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "orderid", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
{Name: "orderstatus", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
{Name: "latitude", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
{Name: "longitude", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
{Name: "logstatus", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "created", Type: arrow.FixedWidthTypes.Timestamp_ms},
|
||||
{Name: "updated", Type: arrow.FixedWidthTypes.Timestamp_ms},
|
||||
{Name: "firstname", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
{Name: "lastname", Type: arrow.BinaryTypes.String, Nullable: true},
|
||||
}, nil)
|
||||
|
||||
pool := memory.NewGoAllocator()
|
||||
builder := array.NewRecordBuilder(pool, schema)
|
||||
defer builder.Release()
|
||||
|
||||
for rows.Next() {
|
||||
|
||||
var (
|
||||
logid, deliveryid, orderheaderid int32
|
||||
tenantid, locationid, userid int32
|
||||
partnerid, logstatus int32
|
||||
|
||||
logdate, created, updated sql.NullTime
|
||||
orderid, orderstatus sql.NullString
|
||||
latitude, longitude sql.NullString
|
||||
firstname, lastname sql.NullString
|
||||
)
|
||||
|
||||
if err := rows.Scan(
|
||||
&logid, &logdate, &deliveryid, &orderheaderid,
|
||||
&tenantid, &locationid, &userid, &partnerid,
|
||||
&orderid, &orderstatus, &latitude, &longitude,
|
||||
&logstatus, &created, &updated, &firstname, &lastname,
|
||||
); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
builder.Field(0).(*array.Int32Builder).Append(logid)
|
||||
|
||||
if logdate.Valid {
|
||||
builder.Field(1).(*array.TimestampBuilder).
|
||||
Append(arrow.Timestamp(logdate.Time.UnixMilli()))
|
||||
} else {
|
||||
builder.Field(1).(*array.TimestampBuilder).AppendNull()
|
||||
}
|
||||
|
||||
builder.Field(2).(*array.Int32Builder).Append(deliveryid)
|
||||
builder.Field(3).(*array.Int32Builder).Append(orderheaderid)
|
||||
builder.Field(4).(*array.Int32Builder).Append(tenantid)
|
||||
builder.Field(5).(*array.Int32Builder).Append(locationid)
|
||||
builder.Field(6).(*array.Int32Builder).Append(userid)
|
||||
builder.Field(7).(*array.Int32Builder).Append(partnerid)
|
||||
|
||||
appendString(builder.Field(8), orderid)
|
||||
appendString(builder.Field(9), orderstatus)
|
||||
appendString(builder.Field(10), latitude)
|
||||
appendString(builder.Field(11), longitude)
|
||||
|
||||
builder.Field(12).(*array.Int32Builder).Append(logstatus)
|
||||
|
||||
builder.Field(13).(*array.TimestampBuilder).
|
||||
Append(arrow.Timestamp(created.Time.UnixMilli()))
|
||||
builder.Field(14).(*array.TimestampBuilder).
|
||||
Append(arrow.Timestamp(updated.Time.UnixMilli()))
|
||||
|
||||
appendString(builder.Field(15), firstname)
|
||||
appendString(builder.Field(16), lastname)
|
||||
}
|
||||
|
||||
record := builder.NewRecord()
|
||||
defer record.Release()
|
||||
|
||||
_ = os.MkdirAll("backups/parquet/deliverylogs", 0755)
|
||||
|
||||
filePath := "backups/parquet/deliverylogs/deliverylogs_last_3_months_" +
|
||||
time.Now().Format("2006_01_02") + ".parquet"
|
||||
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// ✅ Correct Parquet + Arrow writer props (Arrow v14)
|
||||
parquetProps := parquet.NewWriterProperties(
|
||||
parquet.WithCompression(compress.Codecs.Snappy),
|
||||
)
|
||||
|
||||
writer, err := pqarrow.NewFileWriter(
|
||||
schema,
|
||||
file,
|
||||
parquetProps,
|
||||
pqarrow.ArrowWriterProperties{}, // zero-value is valid
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
if err := writer.Write(record); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return filePath, nil
|
||||
}
|
||||
|
||||
// helper for nullable strings
|
||||
func appendString(b array.Builder, v sql.NullString) {
|
||||
sb := b.(*array.StringBuilder)
|
||||
if v.Valid {
|
||||
sb.Append(v.String)
|
||||
} else {
|
||||
sb.AppendNull()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user