序
本文主要研究一下machinery的TaskProcessor
TaskProcessor
// TaskProcessor - can process a delivered task// This will probably always be a worker instancetype TaskProcessor interface { Process(signature *tasks。Signature) error CustomQueue() string PreConsumeHandler() bool}
TaskProcessor介面定義了Process、CustomQueue、PreConsumeHandler方法
Worker
// Worker represents a single worker processtype Worker struct { server *Server ConsumerTag string Concurrency int Queue string errorHandler func(err error) preTaskHandler func(*tasks。Signature) postTaskHandler func(*tasks。Signature) preConsumeHandler func(*Worker) bool}// CustomQueue returns Custom Queue of the running worker processfunc (worker *Worker) CustomQueue() string { return worker。Queue}// Process handles received tasks and triggers success/error callbacksfunc (worker *Worker) Process(signature *tasks。Signature) error { // If the task is not registered with this worker, do not continue // but only return nil as we do not want to restart the worker process if !worker。server。IsTaskRegistered(signature。Name) { return nil } taskFunc, err := worker。server。GetRegisteredTask(signature。Name) if err != nil { return nil } // Update task state to RECEIVED if err = worker。server。GetBackend()。SetStateReceived(signature); err != nil { return fmt。Errorf(“Set state to ‘received’ for task %s returned error: %s”, signature。UUID, err) } // Prepare task for processing task, err := tasks。NewWithSignature(taskFunc, signature) // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err != nil { worker。taskFailed(signature, err) return err } // try to extract trace span from headers and add it to the function context // so it can be used inside the function if it has context。Context as the first // argument。 Start a new span if it isn‘t found。 taskSpan := tracing。StartSpanFromHeaders(signature。Headers, signature。Name) tracing。AnnotateSpanWithSignatureInfo(taskSpan, signature) task。Context = opentracing。ContextWithSpan(task。Context, taskSpan) // Update task state to STARTED if err = worker。server。GetBackend()。SetStateStarted(signature); err != nil { return fmt。Errorf(“Set state to ’started‘ for task %s returned error: %s”, signature。UUID, err) } //Run handler before the task is called if worker。preTaskHandler != nil { worker。preTaskHandler(signature) } //Defer run handler for the end of the task if worker。postTaskHandler != nil { defer worker。postTaskHandler(signature) } // Call the task results, err := task。Call() if err != nil { // If a tasks。ErrRetryTaskLater was returned from the task, // retry the task after specified duration retriableErr, ok := interface{}(err)。(tasks。ErrRetryTaskLater) if ok { return worker。retryTaskIn(signature, retriableErr。RetryIn()) } // Otherwise, execute default retry logic based on signature。RetryCount // and signature。RetryTimeout values if signature。RetryCount > 0 { return worker。taskRetry(signature) } return worker。taskFailed(signature, err) } return worker。taskSucceeded(signature, results)}//SetPreConsumeHandler sets a custom handler for the end of a jobfunc (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) { worker。preConsumeHandler = handler}
Worker實現了TaskProcessor介面,其Process方法先透過worker。server。GetRegisteredTask獲取taskFunc,然後透過signature更新state為RECEIVED,之後設定為STARTED,之後執行task。Call(),最後根據結果更新task為failed或者success
小結
machinery的TaskProcessor介面定義了Process、CustomQueue、PreConsumeHandler方法。Worker實現了TaskProcessor介面,其Process方法先透過worker。server。GetRegisteredTask獲取taskFunc,然後透過signature更新state為RECEIVED,之後設定為STARTED,之後執行task。Call(),最後根據結果更新task為failed或者success。
doc
machinery