#java #azkaban #shell #schedule #调度 #golang

命令行获取 Azkaban 中 Job 的依赖关系

摘要

使用 Azkaban 提供的 API 实现命令行中查看此任务的依赖关系。当一个任务失败时,可以通过这个命令查看成功了哪些任务、失败了哪些任务。

前言

Azkaban 中可以设置复杂的依赖关系,虽然在管理界面中有 Graph 来展示它们的关系,不过操作起来并不方便。数据组在使用的过程中希望当知道一个失败的任务,可以知道这个任务之后的哪些任务没有执行。

通过前一篇文章:《Azkaban 自动打包项目并上传》,有了一些开发的经验,于是准备使用 golang 来实现这套 API 便于后期有类似的需求。

开始

先建一个基本的结构体,来保存常用的数据。

package azkaban

type Azkaban struct {
	Username string
	Password string
	Host     string
	Project  string
	Flow     string

	SessionID string
}

func New(user, pass, host, project, flow string) *Azkaban {
	return &Azkaban{
		Username: user,
		Password: pass,
		Host:     host,
		Project:  project,
		Flow:     flow,
	}
}

首先实现 Azkaban 的认证:

package azkaban

import (
	"errors"
	"fmt"
	"io/ioutil"
	"net/http"
	"strings"

	"github.com/json-iterator/go"
)

type AuthResp struct {
	SessionID string `json:"session.id"`
	Status    string `json:"status"`
	Error     string `json:"error"`
}

func (azk *Azkaban) Authenticate() (err error) {
	if azk.SessionID != "" {
		return nil
	}

	sessionID, err := getSessionID(azk.Username, azk.Password, azk.Host)
	if err != nil {
		return
	}

	azk.SessionID = sessionID

	return nil
}

func getSessionID(user, pass, host string) (sessionID string, err error) {
	params := fmt.Sprintf(`action=login&username=%s&password=%s`, user, pass)
	body := strings.NewReader(params)
	req, err := http.NewRequest("POST", host, body)
	if err != nil {
		return
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return
	}
	defer resp.Body.Close()

	var result AuthResp

	b, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return
	}

	err = jsoniter.Unmarshal(b, &result)
	if err != nil {
		return
	}

	if result.Status != "success" {
		err = errors.New(result.Error)
		return
	}

	sessionID = result.SessionID
	return
}

简单的 POST 的请求,请求成功后判断是否认证通过。

有了认证之后,需要获取哪些数据或执行一些操作就比较方便了,下面获取 Job 的所有节点:

package azkaban

import (
	"errors"
	"fmt"
	"io/ioutil"
	"net/http"
	"strings"

	"github.com/json-iterator/go"
)

type JobFlowResp struct {
	Flow  string `json:"flow"`
	Nodes []struct {
		ID   string   `json:"id"`
		In   []string `json:"in"`
		Type string   `json:"type"`
	} `json:"nodes"`
	Project   string `json:"project"`
	ProjectID int    `json:"projectId"`
}

type Node struct {
	Next []*Node
	Prev []*Node

	Name string
	Typ  string
}

var nodes = make(map[string]*Node)

func (azk *Azkaban) FetchJobsOfAFlow(jobName string, direction string) (err error) {
	// 生成通行证
	err = azk.Authenticate()
	if err != nil {
		return
	}

	result, err := fetchJobs(azk.SessionID, azk.Project, azk.Flow, azk.Host)
	if err != nil {
		return
	}

	err = result.printJob(jobName, direction)

	return
}

func (jobFlow JobFlowResp) printJob(jobName string, direction string) error {
	for _, v := range jobFlow.Nodes {
		t := newNode(v.ID, v.Type)
		for _, vv := range v.In {
			tt := newNode(vv, "")

			t.Prev = append(t.Prev, tt)
			tt.Next = append(tt.Next, t)
		}
	}

	t, ok := nodes[jobName]
	if !ok {
		return errors.New(fmt.Sprintf("Job name: %s, not find", jobName))
	}

	var list []string
	switch direction {
	case "behind":
		list = printNode(t)
	case "before":
		list = printNodeBefore(t)
	}

	s := strings.Join(list, ",")

	for jj, j := range strings.Split(s, "{{start}}") {
		if len(j) == 0 {
			continue
		}
		fmt.Println(fmt.Sprintf("------------- %d ---------------", jj))

		for _, k := range strings.Split(strings.Trim(j, ","), ",") {
			fmt.Println(k)
		}

		fmt.Println("---------------------------------")
	}

	return nil
}

func printNode(node *Node) []string {
	var list []string
	if len(node.Next) == 0 {
		list = append(list, node.Name)

		list = append(list, "{{start}}")
	}

	for _, v := range node.Next {
		list = append(list, node.Name)
		list = append(list, printNode(v)...)
	}

	return list
}

func printNodeBefore(node *Node) []string {
	var list []string
	if len(node.Prev) == 0 {
		list = append(list, node.Name)

		list = append(list, "{{start}}")
	}

	for _, v := range node.Prev {
		list = append(list, node.Name)
		list = append(list, printNodeBefore(v)...)
	}

	return list
}

func newNode(name, typ string) *Node {
	if t, ok := nodes[name]; ok {
		t.Typ = typ
		nodes[name] = t
		return t
	}

	node := new(Node)
	node.Name = name
	node.Typ = typ
	nodes[name] = node

	return node
}

func fetchJobs(sessionID, project, flow, host string) (result JobFlowResp, err error) {
	params := fmt.Sprintf(
		`session.id=%s&ajax=fetchflowgraph&project=%s&flow=%s`,
		sessionID, project, flow,
	)

	requestURL := fmt.Sprintf("%s/%s?%s", host, "manager", params)
	req, err := http.NewRequest("GET", requestURL, nil)
	if err != nil {
		return
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return
	}
	defer resp.Body.Close()

	b, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return
	}

	err = jsoniter.Unmarshal(b, &result)
	if err != nil {
		return
	}

	return
}

代码分析

以上代码都是正常的请求及解析的过程,核心代码如下所示:

func newNode(name, typ string) *Node {
	if t, ok := nodes[name]; ok {
		t.Typ = typ
		nodes[name] = t
		return t
	}

	node := new(Node)
	node.Name = name
	node.Typ = typ
	nodes[name] = node

	return node
}

请求返回的 nodes 像如下格式:

[{
  "id" : "test-final",
  "type" : "command",
  "in" : [ "test-job-3" ]
}]
  • id : 当前节点的节点名,一般对应 Job
  • type : 节点类型
  • in : 是一个数组,里面包含所有子点节的节点 id

通过节点的 idin,可以把它们的关系画成一棵树,就如管理页面 Graph 显示的一样。

有了这样的数据,使用如下代码,把所有数据整理一下:

var nodes = make(map[string]*Node)

type Node struct {
	Next []*Node
	Prev []*Node

	Name string
	Typ  string
}

nodes 保存了所有节点,以接口返回的 node 下的 idkey,返回的 Node 中每条数据对应一个 Node 结构,这样把所有的数据都串了起来,像树型结构一样。

通过 id 可以在 nodes 可以获取当前的 Node,这个 Node 中又保存了上级和下级的 Node

找到一个 Node 之后,要获取所有的子节点,只需要把这个节点中的所有 Next 中的 Node 获取出来,就能得到依赖这个节点的所有子节点。同理,想要获取这个节点依赖了哪些节点,把所有的 Prev 获取出来就能是了。

真是一个好结构呢。

小结

此文章主要介绍简单的 azkaban api 使用,主要介绍了这个特殊的数据结构,可以很好的满足这个需求。

当前只实现了 azkaban api 的两个接口,如有其它需求,再增加。

数据结构也很重要,有时间一定要补补,是时候加入到我的看板中了。

Author Mo 最后更新: 2019-01-19 00:35:40