您现在的位置是:首页 >其他 >karpor 云原生如何结合大模型网站首页其他

karpor 云原生如何结合大模型

lu-hongcheng 2026-03-18 12:01:04
简介karpor 云原生如何结合大模型

最近在看 karpor 源码,对于他怎么结合大模型的手法比较感兴趣,读读源码,总结一下。

server 连接大模型

启动路由时会调用 NewAIManager pkg/core/manager/ai/manager.go:27

type AIManager struct {
	client ai.AIProvider
}

// NewAIManager returns a new AIManager object
func NewAIManager(c registry.ExtraConfig) (*AIManager, error) {
	if c.AIAuthToken == "" {
		return nil, ErrMissingAuthToken
	}
	aiClient := ai.NewClient(c.AIBackend)
	if err := aiClient.Configure(ai.ConvertToAIConfig(c)); err != nil {
		return nil, err
	}

	return &AIManager{
		client: aiClient,
	}, nil
}

AIProvider 接口如下,只要定义满足该接口的大模型 Client 都可以接入 karpor

// AIProvider is an interface all AI clients.
type AIProvider interface {
	// Configure sets up the AI service with the provided configuration.
	Configure(config AIConfig) error
	// Generate generates a response from the AI service based on
	// the provided prompt and service type.
	Generate(ctx context.Context, prompt string) (string, error)
	// GenerateStream generates a streaming response from the AI service
	// based on the provided prompt. It returns a channel that will receive
	// chunks of the response as they are generated.
	GenerateStream(ctx context.Context, prompt string) (<-chan string, error)
}

以 openai client 为例
pkg/infra/ai/openai.go:31 配置客户端

func (c *OpenAIClient) Configure(cfg AIConfig) error {
	defaultConfig := openai.DefaultConfig(cfg.AuthToken)
	if cfg.BaseURL != "" {
		defaultConfig.BaseURL = cfg.BaseURL
	}

	if cfg.ProxyEnabled {
		defaultConfig.HTTPClient.Transport = GetProxyHTTPClient(cfg)
	}

	client := openai.NewClientWithConfig(defaultConfig)
	if client == nil {
		return errors.New("error creating OpenAI client")
	}

	c.client = client
	c.model = cfg.Model
	c.temperature = cfg.Temperature
	c.topP = cfg.TopP
	return nil
}

大模型生成回答

func (c *OpenAIClient) Generate(ctx context.Context, prompt string) (string, error) {
	resp, err := c.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
		Model: c.model,
		Messages: []openai.ChatCompletionMessage{
			{
				Role:    openai.ChatMessageRoleUser,
				Content: prompt,
			},
		},
		Temperature: c.temperature,
		TopP:        c.topP,
	})
	if err != nil {
		return "", err
	}

	if len(resp.Choices) == 0 {
		return "", errors.New("no completion choices returned from response")
	}
	return resp.Choices[0].Message.Content, nil
}

流式

func (c *OpenAIClient) GenerateStream(ctx context.Context, prompt string) (<-chan string, error) {
	// Create chat completion stream with streaming enabled
	stream, err := c.client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
		Model: c.model,
		Messages: []openai.ChatCompletionMessage{
			{
				Role:    openai.ChatMessageRoleUser,
				Content: prompt,
			},
		},
		Temperature: c.temperature,
		TopP:        c.topP,
		Stream:      true,
	})
	if err != nil {
		return nil, err
	}

	// Create buffered channel for response chunks
	resultChan := make(chan string, 100)

	// Start goroutine to handle streaming response
	go func() {
		defer close(resultChan)
		defer stream.Close()

		for {
			response, err := stream.Recv()
			if err != nil {
				if err.Error() == "EOF" {
					return
				}
				// Send error as a special message
				resultChan <- "ERROR: " + err.Error()
				return
			}

			// Send non-empty content chunks
			if len(response.Choices) > 0 {
				chunk := response.Choices[0].Delta.Content
				if chunk != "" {
					resultChan <- chunk
				}
			}
		}
	}()

	return resultChan, nil
}

大模型调用流程

以 Pod 日志诊断为例

请求路由会调用函数 DiagnoseLogs

// DiagnoseLogs analyzes pod logs using LLM and returns diagnostic information through a streaming channel
func (a *AIManager) DiagnoseLogs(ctx context.Context, logs []string, language string, eventChan chan<- *DiagnosisEvent) error {
	defer close(eventChan)

	// Combine logs into a single string, limited to last 1000 lines
	if len(logs) > 1000 {
		logs = logs[len(logs)-1000:]
	}
	logsStr := strings.Join(logs, "
")

	// Send start event
	eventChan <- &DiagnosisEvent{
		Type:    "start",
		Content: "Starting log analysis...",
	}

	// Get prompt template and add language instruction
	servicePrompt := ServicePromptMap[LogDiagnosisType]
	if language == "" {
		language = "English"
	}
	prompt := fmt.Sprintf(servicePrompt, language, logsStr)

	// Generate diagnosis using LLM with streaming
	stream, err := a.client.GenerateStream(ctx, prompt)
	if err != nil {
		errEvent := &DiagnosisEvent{
			Type:    "error",
			Content: fmt.Sprintf("Failed to analyze logs: %v", err),
		}
		eventChan <- errEvent
		return fmt.Errorf("failed to generate log diagnosis: %v", err)
	}

	var fullContent strings.Builder
	for chunk := range stream {
		if strings.HasPrefix(chunk, "ERROR:") {
			errEvent := &DiagnosisEvent{
				Type:    "error",
				Content: fmt.Sprintf("Failed to receive diagnosis: %v", strings.TrimPrefix(chunk, "ERROR: ")),
			}
			eventChan <- errEvent
			return fmt.Errorf("failed to receive diagnosis chunk: %v", chunk)
		}

		fullContent.WriteString(chunk)
		eventChan <- &DiagnosisEvent{
			Type:    "chunk",
			Content: chunk,
		}
	}

	// Send complete event
	eventChan <- &DiagnosisEvent{
		Type:    "complete",
		Content: fullContent.String(),
	}

	return nil
}

感觉比较简洁明了,没有用到我以为比较复杂的 agent 功能,仅仅使用了大模型的 prompt 功能,可能是因为只需要大模型对日志进行分析,不需要更复杂的处理(可能大模型最主要的就是调 prompt 🤣)。(后续看下 k8sGPT,一般是怎么做的)

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。