import os
from typing import Optional, Type
from pydantic import Field
from pydantic.v1 import BaseModel
from vocode.streaming.action.base_action import BaseAction
from vocode.streaming.models.actions import ActionConfig as VocodeActionConfig
from vocode.streaming.models.actions import ActionInput, ActionOutput
_NYLAS_ACTION_DESCRIPTION = """Sends an email using Nylas API.
The input to this action is the recipient emails, email body, optional subject.
The subject should only be included if it is a new email thread.
If there is no message id, the email will be sent as a new email. Otherwise, it will be sent as a reply to the given message. Make sure to include the previous message_id
if you are replying to an email.
"""
class NylasSendEmailParameters(BaseModel):
email: str = Field(..., description="The email address to send the email to.")
subject: Optional[str] = Field(None, description="The subject of the email.")
body: str = Field(..., description="The body of the email.")
class NylasSendEmailResponse(BaseModel):
success: bool
class NylasSendEmailVocodeActionConfig(
VocodeActionConfig, type="action_nylas_send_email" # type: ignore
):
pass
class NylasSendEmail(
BaseAction[
NylasSendEmailVocodeActionConfig,
NylasSendEmailParameters,
NylasSendEmailResponse,
]
):
description: str = _NYLAS_ACTION_DESCRIPTION
parameters_type: Type[NylasSendEmailParameters] = NylasSendEmailParameters
response_type: Type[NylasSendEmailResponse] = NylasSendEmailResponse
def __init__(
self,
action_config: NylasSendEmailVocodeActionConfig,
):
super().__init__(
action_config,
quiet=True,
is_interruptible=True,
)
async def _end_of_run_hook(self) -> None:
"""This method is called at the end of the run method. It is optional but intended to be
overridden if needed."""
print("Successfully sent email!")
async def run(
self, action_input: ActionInput[NylasSendEmailParameters]
) -> ActionOutput[NylasSendEmailResponse]:
from nylas import APIClient
# Initialize the Nylas client
nylas = APIClient(
client_id=os.getenv("NYLAS_CLIENT_ID"),
client_secret=os.getenv("NYLAS_CLIENT_SECRET"),
access_token=os.getenv("NYLAS_ACCESS_TOKEN"),
)
# Create the email draft
draft = nylas.drafts.create()
draft.body = action_input.params.body
draft.subject = (
action_input.params.subject.strip() if action_input.params.subject else "Email from Vocode"
)
draft.to = [{"email": action_input.params.email.strip()}]
# Send the email
draft.send()
await self._end_of_run_hook()
return ActionOutput(
action_type=action_input.action_config.type,
response=NylasSendEmailResponse(success=True),
)