• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / grokcore.message / 16098800953

18 Jun 2025 06:36AM UTC coverage: 94.186%. Remained the same
16098800953

push

github

icemac
Back to development: 5.1

9 of 12 branches covered (75.0%)

Branch coverage included in aggregate %.

72 of 74 relevant lines covered (97.3%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/src/grokcore/message/utils.py
1
from zope.component import queryUtility
1✔
2

3
from grokcore.message import IMessageReceiver
1✔
4
from grokcore.message import IMessageSource
1✔
5

6

7
def send(message, type='message', name='session'):
1✔
8
    """Adds a short message to a given source.
9

10
    If the message has been sent with success, True is returned.
11
    Otherwise, False is returned.
12
    """
13
    source = queryUtility(IMessageSource, name=name)
1✔
14
    if source is None:
1✔
15
        return False
1✔
16
    source.send(message, type)
1✔
17
    return True
1✔
18

19

20
def get_from_source(name=''):
1✔
21
    """List messages from a given source.
22

23
    If the received has been found with success, a list
24
    of messages is returned. Otherwise, False is returned.
25
    """
26
    source = queryUtility(IMessageSource, name=name)
1✔
27
    if source is None:
1✔
28
        return None
1✔
29
    return source.list()
1✔
30

31

32
def receive(name=''):
1✔
33
    """Receives messages from a given receiver.
34

35
    If the received has been found with success, an iterable
36
    of messages is returned. Otherwise, None is returned.
37
    """
38
    receiver = queryUtility(IMessageReceiver, name=name)
1✔
39
    if receiver is None:
1!
40
        return None
×
41
    return receiver.receive()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc