From 4e94e34a377f3f26a944a3ed3abbceb6657d3800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alain=20P=C3=A9teut?= Date: Thu, 17 Jan 2019 18:35:42 +0100 Subject: [PATCH] started w/ axi4 * see #12 --- .travis.yml | 7 +- src/migen_axi/interconnect/axi4.py | 211 +++++++++++++++++++++++++++++ tests/interconnect/__init__.py | 0 tests/interconnect/test_axi4.py | 73 ++++++++++ 4 files changed, 286 insertions(+), 5 deletions(-) create mode 100644 src/migen_axi/interconnect/axi4.py create mode 100644 tests/interconnect/__init__.py create mode 100644 tests/interconnect/test_axi4.py diff --git a/.travis.yml b/.travis.yml index 5c65960..e432a39 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,10 @@ sudo: false language: python python: - - 3.5 -env: - - TOXENV=py35 + - 3.6 install: - - pip install -U pip - pip install tox - - pip install setuptools==26.1 + - pip install tox-travis - pip install coveralls script: - tox diff --git a/src/migen_axi/interconnect/axi4.py b/src/migen_axi/interconnect/axi4.py new file mode 100644 index 0000000..7a7779c --- /dev/null +++ b/src/migen_axi/interconnect/axi4.py @@ -0,0 +1,211 @@ +from .axi import Burst, Alock, Response +import typing +import ramda as R +from migen import * # noqa +from misoc.interconnect import stream + +__all__ = ["Axi4Config", "axi4_aw", "axi4_ar", "axi4_arw", + "axi4_w", "axi4_b", "axi4_r"] + + +class Axi4Config(typing.NamedTuple): + """Configuration for the Axi4 bus + """ + addr_width: int + data_width: int + id_width: int = -1 + use_id: bool = True + use_region: bool = True + use_burst: bool = True + use_lock: bool = True + use_cache: bool = True + use_size: bool = True + use_qos: bool = True + use_len: bool = True + use_last: bool = True + use_resp: bool = True + use_prot: bool = True + use_strb: bool = True + ar_user_width: int = -1 + aw_user_width: int = -1 + r_user_width: int = -1 + w_user_width: int = -1 + b_user_width: int = -1 + + @property + def use_ar_user(self) -> bool: + return self.ar_user_width >= 0 + + @property + def use_aw_user(self) -> bool: + return self.aw_user_width >= 0 + + @property + def use_r_user(self) -> bool: + return self.r_user_width >= 0 + + @property + def use_w_user(self) -> bool: + return self.w_user_width >= 0 + + @property + def use_b_user(self) -> bool: + return self.b_user_width >= 0 + + @property + def use_arw_user(self) -> int: + return self.arw_user_width >= 0 # Shared AR/AW channel + + @property + def arw_user_width(self) -> int: + return max(self.ar_user_width, self.aw_user_width) + + @property + def byte_per_word(self) -> int: + return self.data_width // 8 + + +m2s, s2m = R.append(DIR_M_TO_S), R.append(DIR_S_TO_M) +if_else_none = R.if_else(R.__, R.__, R.always(None)) +filter_nil = R.reject(R.is_nil) +two_ary = R.curry(R.n_ary(2, R.unapply(R.identity))) +namedtuple2map = R.invoker(0, "_asdict") + +axi4_ax = R.compose( + filter_nil, + R.juxt([ + R.compose(m2s, R.prepend("addr"), R.of, R.path([0, "addr_width"])), + if_else_none( + R.path([0, "use_id"]), + R.compose(m2s, R.prepend("id"), R.of, R.path([0, "id_width"]))), + if_else_none( + R.path([0, "use_region"]), + R.compose(m2s, R.prepend("region"), R.always([4]))), + if_else_none( + R.path([0, "use_len"]), + R.compose(m2s, R.prepend("len"), R.always([8]))), + if_else_none( + R.path([0, "use_size"]), + R.compose(m2s, R.prepend("size"), R.always([3]))), + if_else_none( + R.path([0, "use_burst"]), + R.compose(m2s, R.prepend("burst"), R.always([2]))), + if_else_none( + R.path([0, "use_lock"]), + R.compose(m2s, R.prepend("lock"), R.always([1]))), + if_else_none( + R.path([0, "use_cache"]), + R.compose(m2s, R.prepend("cache"), R.always([4]))), + if_else_none( + R.path([0, "use_qos"]), + R.compose(m2s, R.prepend("qos"), R.always([4]))), + if_else_none( + R.compose(R.flip(R.gt)(0), R.nth(1)), + R.compose(m2s, R.prepend("user"), R.of, R.nth(1))), + if_else_none( + R.path([0, "use_prot"]), + R.compose(m2s, R.prepend("prot"), R.always([3]))), + ]), + R.juxt([R.compose(namedtuple2map, R.head), R.nth(1)]), + two_ary) + + +@R.curry +def _set_burst(val: int, config: Axi4Config, m: Module, chan: stream.Endpoint): + assert config.use_burst + m.comb += chan.burst.eq(val) + + +set_burst_fixed = _set_burst(Burst.fixed) +set_burst_wrap = _set_burst(Burst.wrap) +set_burst_incr = _set_burst(Burst.incr) + + +@R.curry +def _set(name: str, config: Axi4Config, m: Module, chan: stream.Endpoint, + val: int): + if getattr(config, name): + m.comb += getattr(chan, name).eq(val) + + +set_size = _set("size") +set_lock = _set("lock") +set_cache = _set("cache") + +axi4_aw = axi4_ax +axi4_ar = axi4_ax +axi4_arw = R.compose(R.append(["write", 1, DIR_M_TO_S]), axi4_ax) + +axi4_w = R.compose( + filter_nil, + R.juxt([ + R.compose(m2s, R.prepend("data"), R.of, R.path([0, "data_width"])), + if_else_none( + R.path([0, "use_strb"]), + R.compose(m2s, R.prepend("strb"), R.of, + R.path([0, "byte_per_word"]))), + if_else_none( + R.path([0, "use_w_user"]), + R.compose(m2s, R.prepend("user"), R.of, + R.path([0, "w_user_width"]))), + if_else_none( + R.path([0, "use_last"]), + R.compose(m2s, R.prepend("last"), R.always([1])))]), + R.juxt([R.compose(namedtuple2map, R.head), R.nth(1)]), + two_ary) + +axi4_b = R.compose( + filter_nil, + R.juxt([ + if_else_none( + R.path([0, "use_id"]), + R.compose(s2m, R.prepend("id"), R.of, R.path([0, "id_width"]))), + if_else_none( + R.path([0, "use_resp"]), + R.compose(s2m, R.prepend("resp"), R.always([2]))), + if_else_none( + R.path([0, "use_b_user"]), + R.compose( + s2m, R.prepend("user"), R.of, R.path([0, "b_user_width"])))]), + R.juxt([R.compose(namedtuple2map, R.head), R.nth(1)]), + two_ary) + +_set_resp = _set("resp") +set_okay = _set_resp(R.__, R.__, R.__, Response.okay) +set_exokay = _set_resp(R.__, R.__, R.__, Response.exokay) +set_slverr = _set_resp(R.__, R.__, R.__, Response.slverr) +set_decerr = _set_resp(R.__, R.__, R.__, Response.decerr) + + +@R.curry +def _get(name: str, chan: stream.Endpoint): + return getattr(name, chan) + + +_get_resp = _get("resp") +is_okay = R.compose(R.equals(Response.okay), _get_resp) +is_exokay = R.compose(R.equals(Response.exokay), _get_resp) +is_slverr = R.compose(R.equals(Response.slverr), _get_resp) +is_decerr = R.compose(R.equals(Response.decerr), _get_resp) + + +axi4_r = R.compose( + filter_nil, + R.juxt([ + R.compose(s2m, R.prepend("data"), R.of, R.path([0, "data_width"])), + if_else_none( + R.path([0, "use_id"]), + R.compose(s2m, R.prepend("id"), R.of, R.path([0, "id_width"]))), + if_else_none( + R.path([0, "use_resp"]), + R.compose(s2m, R.prepend("resp"), R.always([2]))), + if_else_none( + R.path([0, "use_last"]), + R.compose(s2m, R.prepend("last"), R.always([1]))), + if_else_none( + R.path([0, "use_r_user"]), + R.compose( + s2m, R.prepend("user"), R.of, R.path([0, "r_user_width"]))), + ]), + R.juxt([R.compose(namedtuple2map, R.head), R.nth(1)]), + two_ary) diff --git a/tests/interconnect/__init__.py b/tests/interconnect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/interconnect/test_axi4.py b/tests/interconnect/test_axi4.py new file mode 100644 index 0000000..b10fd58 --- /dev/null +++ b/tests/interconnect/test_axi4.py @@ -0,0 +1,73 @@ +import ramda as R +from migen_axi.interconnect import axi4 + + +get_name = R.head +get_width = R.nth(1) +get_direction = R.nth(-1) +get_names = R.map(get_name) + + +def get_item_by_name(name): + return R.find(R.compose(R.equals(name), get_name)) + + +def test_axi4config(): + dut = axi4.Axi4Config(16, 32, use_region=False) + assert dut.addr_width == 16 + assert dut.data_width == 32 + assert dut.use_ar_user is False + assert dut.use_aw_user is False + assert dut.use_w_user is False + assert dut.use_b_user is False + assert dut.use_arw_user is False + assert dut.arw_user_width == -1 + assert dut.byte_per_word == 4 + + +def test_axi4_aw(): + cfg = axi4.Axi4Config(16, 32, use_region=False) + dut = axi4.axi4_aw(cfg, 0) + assert len(dut) == 9 + assert "region" not in get_names(dut) + assert "user" not in get_names(dut) + + +def test_axi4_ar(): + cfg = axi4.Axi4Config(16, 32, use_region=False, use_lock=False) + dut = axi4.axi4_ar(cfg, 0) + assert len(dut) == 8 + assert "region" not in get_names(dut) + assert "lock" not in get_names(dut) + + +def test_axi4_arw(): + cfg = axi4.Axi4Config(16, 32, use_region=False, use_lock=False) + dut = axi4.axi4_arw(cfg, 0) + assert len(dut) == 9 + assert "region" not in get_names(dut) + assert "write" in get_names(dut) + assert R.compose(get_width, get_item_by_name("write"))(dut) == 1 + + +def test_axi4_w(): + cfg = axi4.Axi4Config(16, 32) + dut = axi4.axi4_w(cfg, 0) + assert len(dut) == 3 + assert "data" in get_names(dut) + assert "strb" in get_names(dut) + assert "user" not in get_names(dut) + + assert R.compose(get_width, get_item_by_name("data"))(dut) == 32 + assert R.compose(get_width, get_item_by_name("last"))(dut) == 1 + + +def test_axi4_b(): + cfg = axi4.Axi4Config(16, 32) + dut = axi4.axi4_b(cfg, 0) + + assert len(dut) == 2 + assert "id" in get_names(dut) + assert "resp" in get_names(dut) + + assert R.compose(get_width, get_item_by_name("resp"))(dut) == 2