157 lines
4.6 KiB
Go
157 lines
4.6 KiB
Go
package backup
|
|
|
|
import (
|
|
"database/sql"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/apache/arrow/go/v14/arrow"
|
|
"github.com/apache/arrow/go/v14/arrow/array"
|
|
"github.com/apache/arrow/go/v14/arrow/memory"
|
|
"github.com/apache/arrow/go/v14/parquet"
|
|
"github.com/apache/arrow/go/v14/parquet/compress"
|
|
"github.com/apache/arrow/go/v14/parquet/pqarrow"
|
|
)
|
|
|
|
func BackupDeliveryLogs(sqlDB *sql.DB) (string, error) {
|
|
|
|
rows, err := sqlDB.Query(`
|
|
SELECT
|
|
logid, logdate, deliveryid, orderheaderid,
|
|
tenantid, locationid, userid, partnerid,
|
|
orderid, orderstatus, latitude, longitude,
|
|
logstatus, created, updated, firstname, lastname
|
|
FROM deliverylogs
|
|
WHERE logdate >= NOW() - INTERVAL '3 months'
|
|
`)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Arrow schema (exact match with MySQL table)
|
|
schema := arrow.NewSchema([]arrow.Field{
|
|
{Name: "logid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "logdate", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true},
|
|
{Name: "deliveryid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "orderheaderid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "tenantid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "locationid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "userid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "partnerid", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "orderid", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "orderstatus", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "latitude", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "longitude", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "logstatus", Type: arrow.PrimitiveTypes.Int32},
|
|
{Name: "created", Type: arrow.FixedWidthTypes.Timestamp_ms},
|
|
{Name: "updated", Type: arrow.FixedWidthTypes.Timestamp_ms},
|
|
{Name: "firstname", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "lastname", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
}, nil)
|
|
|
|
pool := memory.NewGoAllocator()
|
|
builder := array.NewRecordBuilder(pool, schema)
|
|
defer builder.Release()
|
|
|
|
for rows.Next() {
|
|
|
|
var (
|
|
logid, deliveryid, orderheaderid int32
|
|
tenantid, locationid, userid int32
|
|
partnerid, logstatus int32
|
|
|
|
logdate, created, updated sql.NullTime
|
|
orderid, orderstatus sql.NullString
|
|
latitude, longitude sql.NullString
|
|
firstname, lastname sql.NullString
|
|
)
|
|
|
|
if err := rows.Scan(
|
|
&logid, &logdate, &deliveryid, &orderheaderid,
|
|
&tenantid, &locationid, &userid, &partnerid,
|
|
&orderid, &orderstatus, &latitude, &longitude,
|
|
&logstatus, &created, &updated, &firstname, &lastname,
|
|
); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
builder.Field(0).(*array.Int32Builder).Append(logid)
|
|
|
|
if logdate.Valid {
|
|
builder.Field(1).(*array.TimestampBuilder).
|
|
Append(arrow.Timestamp(logdate.Time.UnixMilli()))
|
|
} else {
|
|
builder.Field(1).(*array.TimestampBuilder).AppendNull()
|
|
}
|
|
|
|
builder.Field(2).(*array.Int32Builder).Append(deliveryid)
|
|
builder.Field(3).(*array.Int32Builder).Append(orderheaderid)
|
|
builder.Field(4).(*array.Int32Builder).Append(tenantid)
|
|
builder.Field(5).(*array.Int32Builder).Append(locationid)
|
|
builder.Field(6).(*array.Int32Builder).Append(userid)
|
|
builder.Field(7).(*array.Int32Builder).Append(partnerid)
|
|
|
|
appendString(builder.Field(8), orderid)
|
|
appendString(builder.Field(9), orderstatus)
|
|
appendString(builder.Field(10), latitude)
|
|
appendString(builder.Field(11), longitude)
|
|
|
|
builder.Field(12).(*array.Int32Builder).Append(logstatus)
|
|
|
|
builder.Field(13).(*array.TimestampBuilder).
|
|
Append(arrow.Timestamp(created.Time.UnixMilli()))
|
|
builder.Field(14).(*array.TimestampBuilder).
|
|
Append(arrow.Timestamp(updated.Time.UnixMilli()))
|
|
|
|
appendString(builder.Field(15), firstname)
|
|
appendString(builder.Field(16), lastname)
|
|
}
|
|
|
|
record := builder.NewRecord()
|
|
defer record.Release()
|
|
|
|
_ = os.MkdirAll("backups/parquet/deliverylogs", 0755)
|
|
|
|
filePath := "backups/parquet/deliverylogs/deliverylogs_last_3_months_" +
|
|
time.Now().Format("2006_01_02") + ".parquet"
|
|
|
|
file, err := os.Create(filePath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer file.Close()
|
|
|
|
// ✅ Correct Parquet + Arrow writer props (Arrow v14)
|
|
parquetProps := parquet.NewWriterProperties(
|
|
parquet.WithCompression(compress.Codecs.Snappy),
|
|
)
|
|
|
|
writer, err := pqarrow.NewFileWriter(
|
|
schema,
|
|
file,
|
|
parquetProps,
|
|
pqarrow.ArrowWriterProperties{}, // zero-value is valid
|
|
)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer writer.Close()
|
|
|
|
if err := writer.Write(record); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return filePath, nil
|
|
}
|
|
|
|
// helper for nullable strings
|
|
func appendString(b array.Builder, v sql.NullString) {
|
|
sb := b.(*array.StringBuilder)
|
|
if v.Valid {
|
|
sb.Append(v.String)
|
|
} else {
|
|
sb.AppendNull()
|
|
}
|
|
}
|