聊聊machinery的TaskProcessor

本文主要研究一下machinery的TaskProcessor

聊聊machinery的TaskProcessor

TaskProcessor

// TaskProcessor - can process a delivered task// This will probably always be a worker instancetype TaskProcessor interface {    Process(signature *tasks。Signature) error    CustomQueue() string    PreConsumeHandler() bool}

TaskProcessor介面定義了Process、CustomQueue、PreConsumeHandler方法

Worker

// Worker represents a single worker processtype Worker struct {    server            *Server    ConsumerTag       string    Concurrency       int    Queue             string    errorHandler      func(err error)    preTaskHandler    func(*tasks。Signature)    postTaskHandler   func(*tasks。Signature)    preConsumeHandler func(*Worker) bool}// CustomQueue returns Custom Queue of the running worker processfunc (worker *Worker) CustomQueue() string {    return worker。Queue}// Process handles received tasks and triggers success/error callbacksfunc (worker *Worker) Process(signature *tasks。Signature) error {    // If the task is not registered with this worker, do not continue    // but only return nil as we do not want to restart the worker process    if !worker。server。IsTaskRegistered(signature。Name) {        return nil    }    taskFunc, err := worker。server。GetRegisteredTask(signature。Name)    if err != nil {        return nil    }    // Update task state to RECEIVED    if err = worker。server。GetBackend()。SetStateReceived(signature); err != nil {        return fmt。Errorf(“Set state to ‘received’ for task %s returned error: %s”, signature。UUID, err)    }    // Prepare task for processing    task, err := tasks。NewWithSignature(taskFunc, signature)    // if this failed, it means the task is malformed, probably has invalid    // signature, go directly to task failed without checking whether to retry    if err != nil {        worker。taskFailed(signature, err)        return err    }    // try to extract trace span from headers and add it to the function context    // so it can be used inside the function if it has context。Context as the first    // argument。 Start a new span if it isn‘t found。    taskSpan := tracing。StartSpanFromHeaders(signature。Headers, signature。Name)    tracing。AnnotateSpanWithSignatureInfo(taskSpan, signature)    task。Context = opentracing。ContextWithSpan(task。Context, taskSpan)    // Update task state to STARTED    if err = worker。server。GetBackend()。SetStateStarted(signature); err != nil {        return fmt。Errorf(“Set state to ’started‘ for task %s returned error: %s”, signature。UUID, err)    }    //Run handler before the task is called    if worker。preTaskHandler != nil {        worker。preTaskHandler(signature)    }    //Defer run handler for the end of the task    if worker。postTaskHandler != nil {        defer worker。postTaskHandler(signature)    }    // Call the task    results, err := task。Call()    if err != nil {        // If a tasks。ErrRetryTaskLater was returned from the task,        // retry the task after specified duration        retriableErr, ok := interface{}(err)。(tasks。ErrRetryTaskLater)        if ok {            return worker。retryTaskIn(signature, retriableErr。RetryIn())        }        // Otherwise, execute default retry logic based on signature。RetryCount        // and signature。RetryTimeout values        if signature。RetryCount > 0 {            return worker。taskRetry(signature)        }        return worker。taskFailed(signature, err)    }    return worker。taskSucceeded(signature, results)}//SetPreConsumeHandler sets a custom handler for the end of a jobfunc (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {    worker。preConsumeHandler = handler}

Worker實現了TaskProcessor介面,其Process方法先透過worker。server。GetRegisteredTask獲取taskFunc,然後透過signature更新state為RECEIVED,之後設定為STARTED,之後執行task。Call(),最後根據結果更新task為failed或者success

小結

machinery的TaskProcessor介面定義了Process、CustomQueue、PreConsumeHandler方法。Worker實現了TaskProcessor介面,其Process方法先透過worker。server。GetRegisteredTask獲取taskFunc,然後透過signature更新state為RECEIVED,之後設定為STARTED,之後執行task。Call(),最後根據結果更新task為failed或者success。

doc

machinery