命令行获取 Azkaban 中 Job 的依赖关系
摘要
使用Azkaban
提供的 API
实现命令行中查看此任务的依赖关系。当一个任务失败时,可以通过这个命令查看成功了哪些任务、失败了哪些任务。
前言
Azkaban
中可以设置复杂的依赖关系,虽然在管理界面中有 Graph
来展示它们的关系,不过操作起来并不方便。数据组在使用的过程中希望当知道一个失败的任务,可以知道这个任务之后的哪些任务没有执行。
通过前一篇文章:《Azkaban 自动打包项目并上传》,有了一些开发的经验,于是准备使用 golang
来实现这套 API
便于后期有类似的需求。
开始
先建一个基本的结构体,来保存常用的数据。
package azkaban
type Azkaban struct {
Username string
Password string
Host string
Project string
Flow string
SessionID string
}
func New(user, pass, host, project, flow string) *Azkaban {
return &Azkaban{
Username: user,
Password: pass,
Host: host,
Project: project,
Flow: flow,
}
}
首先实现 Azkaban
的认证:
package azkaban
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/json-iterator/go"
)
type AuthResp struct {
SessionID string `json:"session.id"`
Status string `json:"status"`
Error string `json:"error"`
}
func (azk *Azkaban) Authenticate() (err error) {
if azk.SessionID != "" {
return nil
}
sessionID, err := getSessionID(azk.Username, azk.Password, azk.Host)
if err != nil {
return
}
azk.SessionID = sessionID
return nil
}
func getSessionID(user, pass, host string) (sessionID string, err error) {
params := fmt.Sprintf(`action=login&username=%s&password=%s`, user, pass)
body := strings.NewReader(params)
req, err := http.NewRequest("POST", host, body)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
var result AuthResp
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = jsoniter.Unmarshal(b, &result)
if err != nil {
return
}
if result.Status != "success" {
err = errors.New(result.Error)
return
}
sessionID = result.SessionID
return
}
简单的 POST
的请求,请求成功后判断是否认证通过。
有了认证之后,需要获取哪些数据或执行一些操作就比较方便了,下面获取 Job
的所有节点:
package azkaban
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/json-iterator/go"
)
type JobFlowResp struct {
Flow string `json:"flow"`
Nodes []struct {
ID string `json:"id"`
In []string `json:"in"`
Type string `json:"type"`
} `json:"nodes"`
Project string `json:"project"`
ProjectID int `json:"projectId"`
}
type Node struct {
Next []*Node
Prev []*Node
Name string
Typ string
}
var nodes = make(map[string]*Node)
func (azk *Azkaban) FetchJobsOfAFlow(jobName string, direction string) (err error) {
// 生成通行证
err = azk.Authenticate()
if err != nil {
return
}
result, err := fetchJobs(azk.SessionID, azk.Project, azk.Flow, azk.Host)
if err != nil {
return
}
err = result.printJob(jobName, direction)
return
}
func (jobFlow JobFlowResp) printJob(jobName string, direction string) error {
for _, v := range jobFlow.Nodes {
t := newNode(v.ID, v.Type)
for _, vv := range v.In {
tt := newNode(vv, "")
t.Prev = append(t.Prev, tt)
tt.Next = append(tt.Next, t)
}
}
t, ok := nodes[jobName]
if !ok {
return errors.New(fmt.Sprintf("Job name: %s, not find", jobName))
}
var list []string
switch direction {
case "behind":
list = printNode(t)
case "before":
list = printNodeBefore(t)
}
s := strings.Join(list, ",")
for jj, j := range strings.Split(s, "{{start}}") {
if len(j) == 0 {
continue
}
fmt.Println(fmt.Sprintf("------------- %d ---------------", jj))
for _, k := range strings.Split(strings.Trim(j, ","), ",") {
fmt.Println(k)
}
fmt.Println("---------------------------------")
}
return nil
}
func printNode(node *Node) []string {
var list []string
if len(node.Next) == 0 {
list = append(list, node.Name)
list = append(list, "{{start}}")
}
for _, v := range node.Next {
list = append(list, node.Name)
list = append(list, printNode(v)...)
}
return list
}
func printNodeBefore(node *Node) []string {
var list []string
if len(node.Prev) == 0 {
list = append(list, node.Name)
list = append(list, "{{start}}")
}
for _, v := range node.Prev {
list = append(list, node.Name)
list = append(list, printNodeBefore(v)...)
}
return list
}
func newNode(name, typ string) *Node {
if t, ok := nodes[name]; ok {
t.Typ = typ
nodes[name] = t
return t
}
node := new(Node)
node.Name = name
node.Typ = typ
nodes[name] = node
return node
}
func fetchJobs(sessionID, project, flow, host string) (result JobFlowResp, err error) {
params := fmt.Sprintf(
`session.id=%s&ajax=fetchflowgraph&project=%s&flow=%s`,
sessionID, project, flow,
)
requestURL := fmt.Sprintf("%s/%s?%s", host, "manager", params)
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = jsoniter.Unmarshal(b, &result)
if err != nil {
return
}
return
}
代码分析
以上代码都是正常的请求及解析的过程,核心代码如下所示:
func newNode(name, typ string) *Node {
if t, ok := nodes[name]; ok {
t.Typ = typ
nodes[name] = t
return t
}
node := new(Node)
node.Name = name
node.Typ = typ
nodes[name] = node
return node
}
请求返回的 nodes
像如下格式:
[{
"id" : "test-final",
"type" : "command",
"in" : [ "test-job-3" ]
}]
id
: 当前节点的节点名,一般对应Job
名type
: 节点类型in
: 是一个数组,里面包含所有子点节的节点id
通过节点的 id
及 in
,可以把它们的关系画成一棵树,就如管理页面 Graph
显示的一样。
有了这样的数据,使用如下代码,把所有数据整理一下:
var nodes = make(map[string]*Node)
type Node struct {
Next []*Node
Prev []*Node
Name string
Typ string
}
nodes
保存了所有节点,以接口返回的 node
下的 id
为 key
,返回的 Node
中每条数据对应一个 Node
结构,这样把所有的数据都串了起来,像树型结构一样。
通过 id
可以在 nodes
可以获取当前的 Node
,这个 Node
中又保存了上级和下级的 Node
。
找到一个 Node
之后,要获取所有的子节点,只需要把这个节点中的所有 Next
中的 Node 获取出来,就能得到依赖这个节点的所有子节点。同理,想要获取这个节点依赖了哪些节点,把所有的 Prev
获取出来就能是了。
真是一个好结构呢。
小结
此文章主要介绍简单的 azkaban api
使用,主要介绍了这个特殊的数据结构,可以很好的满足这个需求。
当前只实现了 azkaban api
的两个接口,如有其它需求,再增加。
数据结构也很重要,有时间一定要补补,是时候加入到我的看板中了。