562 lines
20 KiB
Python
562 lines
20 KiB
Python
import time
|
||
import os
|
||
from itertools import chain, combinations
|
||
|
||
import pytest
|
||
import os
|
||
|
||
from octorest import OctoRest
|
||
|
||
from betamax import Betamax
|
||
from betamax_serializers import pretty_json
|
||
|
||
from _common import URL, APIKEY
|
||
|
||
with Betamax.configure() as config:
|
||
config.cassette_library_dir = 'tests/fixtures/cassettes'
|
||
record_mode = os.environ.get('RECORD', 'none')
|
||
config.default_cassette_options['record_mode'] = record_mode
|
||
config.default_cassette_options['match_requests_on'] = {
|
||
'uri',
|
||
'method',
|
||
}
|
||
Betamax.register_serializer(pretty_json.PrettyJSONSerializer)
|
||
config.default_cassette_options['serialize_with'] = 'prettyjson'
|
||
|
||
|
||
def sleep(seconds):
|
||
'''
|
||
If recording, sleep for a given amount of seconds
|
||
'''
|
||
# if 'RECORD' in os.environ:
|
||
time.sleep(seconds)
|
||
|
||
|
||
def cmd_wait(client, state):
|
||
while client.state() == state:
|
||
sleep(0.1)
|
||
|
||
|
||
def cmd_wait_until(client, state):
|
||
while client.state() != state:
|
||
sleep(0.1)
|
||
|
||
|
||
def subsets(*items):
|
||
'''
|
||
Get all possible subsets of something
|
||
'''
|
||
N = len(items)+1
|
||
return chain(*map(lambda x: combinations(items, x), range(0, N)))
|
||
|
||
|
||
def zero(component):
|
||
'''
|
||
Add a 0 at the end of the component, if it is tool
|
||
'''
|
||
return 'tool0' if component == 'tool' else component
|
||
|
||
|
||
# @pytest.mark.usefixtures('betamax_session')
|
||
@pytest.fixture
|
||
def client():
|
||
return OctoRest(url=URL, apikey=APIKEY, session=None)
|
||
|
||
|
||
@pytest.fixture
|
||
def gcode():
|
||
class GCode:
|
||
def __init__(self, filename):
|
||
self.filename = filename
|
||
self.path = 'tests/fixtures/gcodes/{}'.format(filename)
|
||
|
||
return GCode('telephonebox.gcode')
|
||
|
||
|
||
class TestClient:
|
||
@pytest.mark.usefixtures('betamax_session')
|
||
def test_init_works_with_good_auth(self):
|
||
# Should not raise anything
|
||
OctoRest(url=URL, apikey=APIKEY)
|
||
|
||
@pytest.mark.usefixtures('betamax_session')
|
||
def test_init_raises_with_bad_auth(self):
|
||
with pytest.raises(RuntimeError):
|
||
OctoRest(url=URL, apikey='nope')
|
||
|
||
### VERSION INFORMATION TESTS ###
|
||
|
||
def test_version(self, client):
|
||
version = client.get_version()
|
||
assert 'api' in version
|
||
assert 'server' in version
|
||
assert 'text' in version
|
||
|
||
### FILE OPERATION TESTS ###
|
||
|
||
def test_files_contains_files_and_free_space_info(self, client):
|
||
files = client.files()
|
||
assert 'bigben.gcode' in [f['name'] for f in files['files']]
|
||
assert isinstance(files['free'], int)
|
||
|
||
def test_files_local_works(self, client):
|
||
files = client.files('local')
|
||
assert 'bigben.gcode' in [f['name'] for f in files['files']]
|
||
assert isinstance(files['free'], int)
|
||
|
||
def test_files_sdcard_works(self, client):
|
||
files = client.files('sdcard')
|
||
assert files['files'] == [] # no files on sdcard
|
||
assert 'free' not in files # API doesn't report that back
|
||
|
||
@pytest.mark.parametrize('filename', ('bigben.gcode', 'stpauls.gcode'))
|
||
def test_info_for_specific_file(self, client, filename):
|
||
f = client.files(filename)
|
||
assert f['name'] == filename
|
||
|
||
@pytest.mark.parametrize('filename', ('unicorn.gcode', 'yeti.gcode', 'noexist.gcode'))
|
||
def test_nonexisting_file_raises(self, client, filename):
|
||
with pytest.raises(RuntimeError):
|
||
client.files(filename)
|
||
|
||
def test_upload_by_path(self, client, gcode):
|
||
f = client.upload(gcode.path)
|
||
assert f['done']
|
||
assert f['files']['local']['name'] == gcode.filename
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_file_object(self, client, gcode):
|
||
with open(gcode.path) as fo:
|
||
f = client.upload(('fake.gcode', fo))
|
||
assert f['done']
|
||
assert f['files']['local']['name'] == 'fake.gcode'
|
||
client.delete('fake.gcode')
|
||
|
||
def test_upload_and_select(self, client, gcode):
|
||
f = client.upload(gcode.path, select=True)
|
||
assert f['done']
|
||
assert f['files']['local']['name'] == gcode.filename
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == gcode.filename
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_and_print(self, client, gcode):
|
||
f = client.upload(gcode.path, print=True)
|
||
sleep(1)
|
||
assert f['done']
|
||
assert f['files']['local']['name'] == gcode.filename
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == gcode.filename
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_and_select_one_by_one(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename)
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == gcode.filename
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_and_select_with_print_one_by_one(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
sleep(1)
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == gcode.filename
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_and_select_and_print_one_by_one(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename)
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == gcode.filename
|
||
client.start()
|
||
sleep(1)
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_file_copy(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.copy(gcode.filename, 'copied.gcode')
|
||
files = client.files()
|
||
assert gcode.filename in [f['name'] for f in files['files']]
|
||
assert 'copied.gcode' in [f['name'] for f in files['files']]
|
||
client.delete(gcode.filename)
|
||
client.delete('copied.gcode')
|
||
|
||
def test_file_copy_exists(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.copy(gcode.filename, 'copied.gcode')
|
||
files = client.files()
|
||
assert gcode.filename in [f['name'] for f in files['files']]
|
||
assert 'copied.gcode' in [f['name'] for f in files['files']]
|
||
with pytest.raises(RuntimeError):
|
||
client.copy(gcode.filename, 'copied.gcode')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_file_copy_folder_not_exist(self, client, gcode):
|
||
files = client.files()
|
||
if 'copied.gcode' in [f['name'] for f in files['files']]:
|
||
client.delete('copied.gcode')
|
||
client.upload(gcode.path)
|
||
with pytest.raises(RuntimeError):
|
||
client.copy(gcode.filename, '/random/path/copied.gcode')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_file_move(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.move(gcode.filename, 'moved.gcode')
|
||
files = client.files()
|
||
assert 'moved.gcode' in [f['name'] for f in files['files']]
|
||
client.delete('moved.gcode')
|
||
|
||
def test_file_move_exists(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.move(gcode.filename, 'moved.gcode')
|
||
files = client.files()
|
||
assert 'moved.gcode' in [f['name'] for f in files['files']]
|
||
client.upload(gcode.path)
|
||
with pytest.raises(RuntimeError):
|
||
client.move(gcode.filename, 'moved.gcode')
|
||
client.delete(gcode.filename)
|
||
client.delete('moved.gcode')
|
||
|
||
def test_file_move_folder_not_exist(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
with pytest.raises(RuntimeError):
|
||
client.copy(gcode.filename, '/random/path/moved.gcode')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_slice_curalegacy(self, client):
|
||
client.slice('biscuithelper.STL', slicer='curalegacy')
|
||
sleep(2)
|
||
files = client.files()
|
||
assert 'biscuithelper.gco' in [f['name'] for f in files['files']]
|
||
client.delete('biscuithelper.gco')
|
||
|
||
@pytest.mark.parametrize('name', ('biscuits.gco', 'richtea.gcode'))
|
||
def test_slice_curalegacy_gcode(self, client, name):
|
||
client.slice('biscuithelper.STL', slicer='curalegacy', gcode=name)
|
||
sleep(2)
|
||
files = client.files()
|
||
assert name in [f['name'] for f in files['files']]
|
||
client.delete(name)
|
||
|
||
def test_slice_curalegacy_select(self, client):
|
||
client.slice('biscuithelper.STL', slicer='curalegacy', select=True)
|
||
sleep(2)
|
||
files = client.files()
|
||
assert 'biscuithelper.gco' in [f['name'] for f in files['files']]
|
||
selected = client.job_info()['job']['file']['name']
|
||
assert selected == 'biscuithelper.gco'
|
||
client.delete('biscuithelper.gco')
|
||
|
||
def test_upload_print_pause_cancel(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
cmd_wait_until(client, 'Printing')
|
||
client.pause()
|
||
cmd_wait(client, 'Pausing')
|
||
assert client.state() == 'Paused'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_print_pause_restart(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
cmd_wait_until(client, 'Printing')
|
||
client.pause()
|
||
cmd_wait_until(client, 'Paused')
|
||
assert client.state() == 'Paused'
|
||
client.restart()
|
||
cmd_wait_until(client, 'Printing')
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_print_pause_resume(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
cmd_wait_until(client, 'Printing')
|
||
client.pause()
|
||
cmd_wait_until(client, 'Paused')
|
||
assert client.state() == 'Paused'
|
||
client.resume()
|
||
cmd_wait_until(client, 'Printing')
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_print_toggle(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
cmd_wait_until(client, 'Printing')
|
||
client.toggle()
|
||
cmd_wait_until(client, 'Paused')
|
||
assert client.state() == 'Paused'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_upload_print_toggle_toggle(self, client, gcode):
|
||
client.upload(gcode.path)
|
||
client.select(gcode.filename, print=True)
|
||
cmd_wait_until(client, 'Printing')
|
||
client.toggle()
|
||
cmd_wait_until(client, 'Paused')
|
||
assert client.state() == 'Paused'
|
||
client.toggle()
|
||
cmd_wait_until(client, 'Printing')
|
||
assert client.state() == 'Printing'
|
||
client.cancel()
|
||
cmd_wait(client, 'Cancelling')
|
||
client.delete(gcode.filename)
|
||
|
||
def test_logs(self, client):
|
||
logs = client.logs()
|
||
assert 'files' in logs
|
||
assert 'free' in logs
|
||
assert isinstance(logs['free'], int)
|
||
|
||
def test_delete_log(self, client):
|
||
logs = client.logs()
|
||
log_lst = [log['name'] for log in logs['files']]
|
||
assert log_lst[0] in log_lst
|
||
client.delete_log(log_lst[0])
|
||
logs = client.logs()
|
||
for log in logs['files']:
|
||
assert log['name'] != log_lst[0]
|
||
|
||
def test_printer(self, client):
|
||
printer = client.printer()
|
||
assert 'ready' in printer['sd']
|
||
assert printer['state']['flags']['operational']
|
||
assert printer['state']['flags']['ready']
|
||
assert not printer['state']['flags']['error']
|
||
assert not printer['state']['flags']['printing']
|
||
|
||
def test_printer_temps(self, client):
|
||
printer = client.printer()
|
||
cmd_wait_until(client, 'Operational')
|
||
assert 'bed' in printer['temperature']
|
||
assert 'tool0' in printer['temperature']
|
||
assert 'history' not in printer['temperature']
|
||
|
||
@pytest.mark.parametrize('exclude', subsets('sd', 'temperature', 'state'))
|
||
def test_printer_with_excluded_stuff(self, client, exclude):
|
||
printer = client.printer(exclude=exclude)
|
||
for key in exclude:
|
||
assert key not in printer
|
||
assert len(printer) == 3 - len(exclude)
|
||
|
||
def test_printer_with_history(self, client):
|
||
printer = client.printer(history=True)
|
||
assert isinstance(printer['temperature']['history'], list)
|
||
|
||
@pytest.mark.parametrize('limit', range(1, 4))
|
||
def test_printer_with_history_and_limit(self, client, limit):
|
||
printer = client.printer(history=True, limit=limit)
|
||
assert len(printer['temperature']['history']) == limit
|
||
|
||
@pytest.mark.parametrize('key', ('actual', 'target', 'offset'))
|
||
@pytest.mark.parametrize('component', ('tool', 'bed'))
|
||
def test_tool_and_bed(self, client, key, component):
|
||
info = getattr(client, component)() # client.tool() or bed()
|
||
assert 'history' not in info
|
||
assert isinstance(info[zero(component)][key], (float, int))
|
||
|
||
# @pytest.mark.parametrize('key', ('actual', 'target'))
|
||
# @pytest.mark.parametrize('component', ('tool', 'bed'))
|
||
# def test_tool_and_bed_with_history(self, client, key, component):
|
||
# # TODO: history is not working with bed or tool, only printer
|
||
# info = getattr(client, component)(history=True)
|
||
# assert 'history' in info
|
||
# for h in info['history']:
|
||
# assert isinstance(h[zero(component)][key], (float, int))
|
||
|
||
# @pytest.mark.parametrize('limit', range(1, 4))
|
||
# @pytest.mark.parametrize('component', ('tool', 'bed'))
|
||
# def test_tool_and_bed_with_history_limit(self, client, limit, component):
|
||
# # TODO: history is not working with bed or tool, only printer
|
||
# info = getattr(client, component)(history=True, limit=limit)
|
||
# assert len(info['history']) == limit
|
||
|
||
def test_home_all(self, client):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.home()
|
||
|
||
@pytest.mark.parametrize('axes', (('x',), ('y',), ('z',), ('x', 'y',)))
|
||
def test_home_some(self, client, axes):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.home(axes)
|
||
|
||
@pytest.mark.parametrize('coordinates', ((20, 0, 0), (0, 20, 0)))
|
||
def test_jog(self, client, coordinates):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.jog(*coordinates)
|
||
|
||
@pytest.mark.parametrize('factor', (100, 50, 150, 0.5, 1.0))
|
||
def test_feedrate(self, client, factor):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.feedrate(factor)
|
||
|
||
@pytest.mark.parametrize('how', (200, [200], {'tool0': 200}))
|
||
def test_set_tool_temperature_to_200(self, client, how):
|
||
client.tool_target(how)
|
||
tool = client.tool()
|
||
assert tool['tool0']['target'] == 200.0
|
||
if 'RECORD' in os.environ:
|
||
# Betamax had some problems here
|
||
# And we don't do this for testing, but only with actual printer
|
||
client.tool_target(0)
|
||
|
||
# @pytest.mark.parametrize('how', (20, [20], {'tool0': 20}))
|
||
# def test_set_tool_offset_to_20(self, client, how):
|
||
# client.tool_offset(how)
|
||
# tool = client.tool()
|
||
# print(tool)
|
||
# assert tool['tool0']['offset'] == 20.0
|
||
# # TODO: make the above assert work?
|
||
# if 'RECORD' in os.environ:
|
||
# client.tool_offset(0)
|
||
|
||
def test_selecting_tool(self, client):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.tool_select(0)
|
||
|
||
def test_extruding(self, client):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.extrude(1)
|
||
|
||
def test_retracting(self, client):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.retract(1)
|
||
|
||
@pytest.mark.parametrize('factor', (100, 75, 125, 0.75, 1.0))
|
||
def test_flowrate(self, client, factor):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.flowrate(factor)
|
||
|
||
def test_set_bed_temperature_to_100(self, client):
|
||
client.bed_target(100)
|
||
bed = client.bed()
|
||
assert bed['bed']['target'] == 100.0
|
||
if 'RECORD' in os.environ:
|
||
client.bed_target(0)
|
||
|
||
def test_set_bed_offset_to_10(self, client):
|
||
client.bed_offset(10)
|
||
bed = client.bed()
|
||
assert bed['bed']['offset'] == 10.0
|
||
if 'RECORD' in os.environ:
|
||
client.bed_offset(0)
|
||
|
||
def test_sd_card_init(self, client):
|
||
client.sd_init()
|
||
|
||
def test_sd_card_refresh(self, client):
|
||
client.sd_refresh()
|
||
|
||
def test_sd_card_release(self, client):
|
||
client.sd_release()
|
||
|
||
def test_sd_card_status(self, client):
|
||
sd = client.sd()
|
||
# no SD card here, so always not ready
|
||
assert sd['ready'] is False
|
||
|
||
def test_single_gcode_command(self, client):
|
||
client.gcode('G28 X')
|
||
|
||
def test_multiple_gcode_commands_nl(self, client):
|
||
client.gcode('G28 X\nG28 Y')
|
||
|
||
def test_multiple_gcode_commands_list(self, client):
|
||
client.gcode(['G28 X', 'G28 Y'])
|
||
|
||
def test_get_settings(self, client):
|
||
settings = client.settings()
|
||
assert 'api' in settings
|
||
assert 'appearance' in settings
|
||
|
||
def test_unchanged_settings(self, client):
|
||
settings = client.settings()
|
||
new_settings = client.settings({})
|
||
assert settings['api'] == new_settings['api']
|
||
assert settings['appearance'] == new_settings['appearance']
|
||
|
||
def test_change_settings(self, client):
|
||
settings = client.settings()
|
||
printer_name = settings['appearance']['name']
|
||
test_name = {'appearance': {'name': 'Gandalf'}}
|
||
new_settings = client.settings(test_name)
|
||
assert new_settings['appearance']['name'] == 'Gandalf'
|
||
client.settings({'appearance': {'name': printer_name}})
|
||
|
||
# def test_tmp_session_key(self, client):
|
||
# key = client.tmp_session_key()
|
||
# print(key)
|
||
|
||
def test_users(self, client):
|
||
users = client.users()
|
||
assert 'users' in users
|
||
|
||
### CONNECTION HANDLING TESTS ###
|
||
|
||
def test_connection_info(self, client):
|
||
info = client.connection_info()
|
||
|
||
assert 'current' in info
|
||
assert 'baudrate' in info['current']
|
||
assert 'port' in info['current']
|
||
assert 'state' in info['current']
|
||
|
||
assert 'options' in info
|
||
assert 'baudrates' in info['options']
|
||
assert 'ports' in info['options']
|
||
|
||
def test_fake_ack(self, client):
|
||
# we are only testing if no exception occurred, there's no return
|
||
client.fake_ack()
|
||
|
||
def test_disconnect(self, client):
|
||
client.disconnect()
|
||
assert client.state() in ['Offline', 'Closed']
|
||
|
||
def test_connect(self, client):
|
||
'''
|
||
Since it's hard with betamax fixture to check state() multiple times
|
||
in one test, this test hopes test_disconnect() was called before it.
|
||
It is not possible to run it without it in record mode.
|
||
TODO: Fix this
|
||
'''
|
||
client.connect()
|
||
cmd_wait(client, 'Detecting baudrate')
|
||
assert client.state() in ['Connecting',
|
||
'Operational',
|
||
'Opening serial port']
|
||
client.disconnect()
|
||
assert client.state() in ['Offline', 'Closed']
|
||
|
||
|
||
# import json
|
||
# client = OctoRest(url=URL, apikey=APIKEY)
|
||
# client.new_folder('hello')
|
||
# f = client.files(recursive=False)
|
||
# print(json.dumps(f, indent=4))
|
||
# g = client.files(recursive=True)
|
||
# print(json.dumps(f, indent=4))
|
||
# print(f == g)
|
||
# print(client.version)
|
||
# client.gcode("M106")
|
||
# client.gcode("M106 \n G28 X Y Z \n M107")
|