Skip to content

Commit b698f52

Browse files
committed
Asyncio server example
1 parent 69b1c28 commit b698f52

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

example/asyncio_server.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#!/usr/bin/env python
2+
"""Demo server for demonstrating async handlers.
3+
4+
$ python example/asyncio_server.py foo
5+
6+
"""
7+
8+
from __future__ import print_function
9+
10+
import time
11+
import logging
12+
import argparse
13+
import asyncio
14+
from p4p.server import StaticProvider, Server
15+
from p4p.server.asyncio import SharedPV
16+
from p4p.nt import NTScalar
17+
import signal
18+
19+
20+
DEFAULT_TIMEOUT = 1
21+
22+
class SomeClassWithACoroutine:
23+
def __init__(self):
24+
self.data = None
25+
26+
async def coroutine(self, value: str):
27+
logging.info(f"Updating {self} from value `{self.data}` to `{value}`.")
28+
self.data = value
29+
30+
31+
class AttrWHandler:
32+
def __init__(self, some_object_with_coro: SomeClassWithACoroutine):
33+
self.some_object_with_coro = some_object_with_coro
34+
35+
async def put(self, pv, op):
36+
value = op.value()
37+
raw_value = value.raw.value
38+
logging.info(f"Received put on `{raw_value}` to `{op.name()}`.")
39+
40+
await self.some_object_with_coro.coroutine(raw_value)
41+
42+
pv.post(value, timestamp=time.time())
43+
op.done()
44+
45+
46+
class AsyncProviderWrapper:
47+
def __init__(self, prefix: str, loop: asyncio.AbstractEventLoop):
48+
self.prefix = prefix
49+
self._loop = loop
50+
self._provider = StaticProvider(prefix)
51+
self._pvs = []
52+
53+
self.setUp()
54+
55+
def __del__(self):
56+
self.tearDown()
57+
58+
@property
59+
def providers(self) -> tuple[StaticProvider]:
60+
return (self._provider,)
61+
62+
async def asyncSetUp(self):
63+
logging.info("Async set up.")
64+
await self.add_pvs()
65+
66+
async def add_pvs(self):
67+
write_pv = SharedPV(
68+
handler=AttrWHandler(SomeClassWithACoroutine()),
69+
nt=NTScalar("s"),
70+
initial="initial_value_1",
71+
)
72+
self._pvs.append(write_pv)
73+
logging.info(f"Added {self.prefix}:WRITE_PV to provider.")
74+
self._provider.add(f"{self.prefix}:WRITE_PV", write_pv)
75+
76+
read_pv = SharedPV(
77+
nt=NTScalar("s"),
78+
initial="initial_value_2",
79+
)
80+
self._pvs.append(read_pv)
81+
logging.info(f"Added {self.prefix}:READ_PV to provider.")
82+
self._provider.add(f"{self.prefix}:READ_PV", read_pv)
83+
84+
def setUp(self):
85+
logging.info("Sync set up.")
86+
self._loop.set_debug(True)
87+
self._loop.run_until_complete(asyncio.wait_for(self.asyncSetUp(), DEFAULT_TIMEOUT))
88+
89+
def tearDown(self):
90+
logging.info("Sync tear down.")
91+
92+
class AsyncServerWrapper:
93+
def __init__(
94+
self,
95+
prefix: str,
96+
):
97+
self._prefix = prefix
98+
99+
def run(self):
100+
loop = asyncio.new_event_loop()
101+
provider_wrapper = AsyncProviderWrapper(self._prefix, loop)
102+
103+
try:
104+
# `Server.forever()` is for p4p threading and shouldn't
105+
# be used with async.
106+
server = Server(provider_wrapper.providers)
107+
with server:
108+
done = asyncio.Event()
109+
loop.add_signal_handler(signal.SIGINT, done.set)
110+
loop.add_signal_handler(signal.SIGTERM, done.set)
111+
loop.run_until_complete(done.wait())
112+
finally:
113+
loop.close()
114+
115+
def main(args: argparse.Namespace):
116+
AsyncServerWrapper(args.prefix).run()
117+
118+
def getargs() -> argparse.Namespace:
119+
P = argparse.ArgumentParser()
120+
P.add_argument('prefix', type=str)
121+
P.add_argument('-v','--verbose', action='store_const', default=logging.INFO, const=logging.DEBUG)
122+
return P.parse_args()
123+
124+
if __name__=='__main__':
125+
args = getargs()
126+
logging.basicConfig(level=args.verbose)
127+
main(args)

0 commit comments

Comments
 (0)