@@ -334,6 +334,7 @@ def main(self) :
334
334
t = asyncio .run (stream_osm_xml (self ))
335
335
except ZeroDivisionError :
336
336
print ('\n Error: boundary is empty or database has no data within' ,file = sys .stderr )
337
+ sys .stderr .flush ()
337
338
338
339
async def test (self ) :
339
340
""" Test: checks if get_lonlat exsits, is executable.
@@ -375,7 +376,11 @@ def __init__(self) :
375
376
#os.get_terminal_size() will error out when .isatty() is false.
376
377
# instead of calling isatty() on each line, save it one at program start
377
378
# (because it does not change)
378
- self .isatty = sys .stderr .isatty ()
379
+ try :
380
+ #sys.stderr.isatty() seems to return True when <prog>|tail -f /dev/stdin
381
+ self .isatty = os .get_terminal_size ().columns > 0
382
+ except OSError :
383
+ self .isatty = False
379
384
380
385
def check_ready (self ) :
381
386
assert self ._ready , 'Need to run .set_phases first'
@@ -394,9 +399,10 @@ def next_phase(self) :
394
399
395
400
def save_clearedline (self ) :
396
401
''' Simply write a newline at the end of the previous clearline: save it.
397
- Warning, will garbe output if not preceded by a clearline=True log call.
402
+ Warning, will garble output if not preceded by a clearline=True log call.
398
403
'''
399
404
print (end = '\n ' ,file = sys .stderr )
405
+ sys .stderr .flush ()
400
406
#same behaviour whether followed by a clearline or not
401
407
self .previous_clearline = False
402
408
@@ -493,23 +499,26 @@ def ratefmt(self,r:float) :
493
499
''' Return str(r) with 3 sigfigs
494
500
'''
495
501
for i ,letter in ((1e12 ,'T' ),(1e9 ,'G' ),(1e6 ,'M' ),(1e3 ,'K' ),(1 ,'' )) :
496
- if r > i :
502
+ if r > i or i == 1 : #give up at <1.0 point
497
503
tgt = r / i
498
504
# ljust for 3 -> 3.00
499
- if tgt < 10.0 :
505
+ if tgt < 1.0 :
506
+ return str (round (tgt ,3 )).ljust (5 ,'0' )+ letter
507
+ elif tgt < 10.0 :
500
508
return str (round (tgt ,2 )).ljust (4 ,'0' )+ letter
501
509
elif tgt < 100.0 :
502
510
return str (round (tgt ,1 )).ljust (4 ,'0' )+ letter
503
511
else :
504
512
return str (round (tgt )).ljust (3 ,'0' )+ letter
505
- #else give up
506
- return n (r )
507
513
508
514
def simplerate (self ,count :int ,msg :str ,tot :int ,lastline = False ) :
509
515
""" Show a rate progress bar on count from tot items in format:
510
516
'{count} ({count_rate}/s) / {tot} {msg} {percent:count/tot}%'
511
517
"""
512
518
self .is_simplerate = True
519
+ if count > 1e6 :
520
+ #set higher for a smoother rate display
521
+ self .sample_length = 100_000
513
522
if not lastline :
514
523
self .samples_append ((count ,))
515
524
self .prev_args = (count ,msg ,tot )
@@ -552,6 +561,8 @@ def multirate(self,ns:typing.Tuple[int],msgs:typing.Tuple[str],count:int,total:i
552
561
how they will be printed out. Not .simplerate() though, it is separate.
553
562
"""
554
563
self .is_simplerate = False
564
+ if ns [0 ]> 1e6 :
565
+ self .sample_length = 100_000
555
566
if not lastline :
556
567
self .samples_append (ns )
557
568
self .prev_args = (ns ,msgs ,count ,total )
@@ -565,10 +576,7 @@ def multirate(self,ns:typing.Tuple[int],msgs:typing.Tuple[str],count:int,total:i
565
576
r_ss = ['(0/s)' for ix in enumerate (ns )]
566
577
rates = [j for ix ,i in enumerate (ns ) for j in (n (i ),r_ss [ix ],msgs [ix ])]
567
578
l = (* rates ,n (count )+ ' / ' + n (total ),' ' ,self .percent (count ,total ),)
568
- if lastline :
569
- self .log (* l ,clearline = True )
570
- else :
571
- self .log (* l ,clearline = True )
579
+ self .log (* l ,clearline = True )
572
580
573
581
def finishrate (self ,lastline = True ) :
574
582
""" Any currently running rate printer (simplerate,rate,doublerate,triplerate)
@@ -581,11 +589,13 @@ def finishrate(self,lastline=True) :
581
589
self .simplerate (* self .prev_args ,lastline = True )
582
590
else :
583
591
self .multirate (* self .prev_args ,lastline = True )
592
+ self .save_clearedline ()
584
593
#reset rate measurement
585
594
self .samples = []
586
595
self .times = []
587
596
self .prev_print_t = 0
588
597
self .prev_args = None
598
+ self .sample_length = 10_000
589
599
590
600
def percent (self ,numer :int ,denom :int )-> str :
591
601
''' Return the str(float(numer/denom)*100) with 3 sigfigs,
@@ -612,7 +622,6 @@ async def chain(*generators:typing.Iterator)->typing.Iterator:
612
622
613
623
async def get_latlon_str_from_flatnodes (osm_ids :typing .Collection [int ],
614
624
s :Settings )-> typing .Iterator :
615
- #beware, need to exchange lonlat -> latlon
616
625
a = await asyncio .create_subprocess_exec (s .get_lonlat_binary ,s .nodes_file ,
617
626
stdout = asyncio .subprocess .PIPE ,stdin = asyncio .subprocess .PIPE )
618
627
# some osm_ids may error out. in that case get_lonlat just ignores them.
@@ -628,6 +637,7 @@ async def get_latlon_str_from_flatnodes(osm_ids:typing.Collection[int],
628
637
while (line := (await a .stdout .readline ()).strip ().decode ()) :
629
638
#l.log('read line',line)
630
639
x ,y ,osm_id = line .split (';' )
640
+ #beware, need to exchange lonlat -> latlon
631
641
yield (osm_id ,y ,x )
632
642
633
643
def all_nwr_within (s :Settings ,a :Accumulator ) :
@@ -638,7 +648,7 @@ def all_nwr_within(s:Settings,a:Accumulator) :
638
648
s .c .execute (f'SELECT osm_id FROM { tbl_name } WHERE { constr } ;' )
639
649
for row in g_from_cursor (s .c ,verbose = True ,prefix_msg = tbl_name + ' ' ) :
640
650
a .add ('nodes' ,row ['osm_id' ])
641
- l .log (a .len ('nodes' ),'nodes within bounds' )
651
+ l .log (n ( a .len ('nodes' ) ),'nodes within bounds' )
642
652
643
653
# 1b) select all ways,rels FROM planet_osm_polygon WHERE way ST_Within(bbox);
644
654
constr ,tbl_name = s .make_bounds_constr ('_polygon' )
@@ -650,7 +660,7 @@ def all_nwr_within(s:Settings,a:Accumulator) :
650
660
a .add ('ways' ,id )
651
661
else :
652
662
a .add ('rels' ,- id )
653
- l .log (a .len ('ways' ),'ways,' ,a .len ('rels' ),'rels from' ,tbl_name )
663
+ l .log (n ( a .len ('ways' )) ,'ways,' ,n ( a .len ('rels' ) ),'rels from' ,tbl_name )
654
664
655
665
# 1c) select all ways,rels FROM planet_osm_line WHERE way ST_Within(bbox);
656
666
# planet_osm_roads is not needed in that fashion, because it is a strict subset
@@ -664,16 +674,14 @@ def all_nwr_within(s:Settings,a:Accumulator) :
664
674
a .add ('ways' ,id )
665
675
else :
666
676
a .add ('rels' ,- id )
667
- l .log (a .len ('ways' ),'ways,' ,a .len ('rels' ),'rels within bounds' )
677
+ l .log (n ( a .len ('ways' )) ,'ways,' ,n ( a .len ('rels' ) ),'rels within bounds' )
668
678
669
-
670
679
def nodes_parent_wr (s :Settings ,a :Accumulator ,only_nodes_within = False ) :
671
680
# 2a) foreach node_id :
672
681
# 2b) select all ways WHERE ARRAY[node_id]::bigint[] <@ nodes;
673
682
# 2c) select all rels WHERE ARRAY[node_id]::bigint[] <@ parts;
674
683
nodes_name = 'nodes_within' if only_nodes_within else 'nodes'
675
684
a_len = a .len
676
- l .log ('checking parent ways of' ,a_len (nodes_name ),'nodes' )
677
685
way_count = 0
678
686
rel_count = 0
679
687
node_count = 0
@@ -714,7 +722,7 @@ def nodes_parent_wr(s:Settings,a:Accumulator,only_nodes_within=False) :
714
722
for rel in rel_ids :
715
723
rel_count += 1
716
724
a .add ('rels' ,rel ['id' ])
717
- l .finishrate (lastline = False )
725
+ l .finishrate ()
718
726
l .log (n (a_len ('ways' )),'ways,' ,n (a_len ('rels' )),'rels forward from nodes' )
719
727
720
728
def ways_parent_r (s :Settings ,a :Accumulator ) :
@@ -885,7 +893,7 @@ async def stream_osm_xml(s:Settings) :
885
893
886
894
l .next_phase () #write
887
895
counts = [a .len (i )for i in ('nodes' ,'ways' ,'rels' )]
888
- l .log ('dumping' ,counts [0 ],'nodes,' ,counts [1 ],'ways,' ,counts [2 ],'rels in total' )
896
+ l .log ('dumping' ,n ( counts [0 ]) ,'nodes,' ,n ( counts [1 ]) ,'ways,' ,n ( counts [2 ]) ,'rels in total' )
889
897
# we now have: [~3.3M nodes, ~400K ways, ~8K rels] with_parents=True
890
898
891
899
# ONLY after all ids have been resolved, do we actually query the data,
@@ -905,7 +913,6 @@ async def stream_osm_xml(s:Settings) :
905
913
create_relations (s ,a ),
906
914
) :
907
915
xml_out .write (el )
908
- print (file = sys .stderr )
909
916
910
917
def rel_to_xml (row_dict :dict ,tags :dict ,new_jsonb_schema :bool )-> ET .Element :
911
918
# separate tags and row_dict, see way_to_xml()
@@ -1040,7 +1047,7 @@ def create_relations(s:Settings,a:Accumulator)->typing.Iterator[ET.Element] :
1040
1047
for row_dict in g_query_ids (s .c ,query ,g_negate (a .all_subtract ('rels' ,'done_ids' )),'osm_id' ,step = 250 ) :
1041
1048
if first :
1042
1049
start_t = time .time ()
1043
- l .log ('rels _line output start' ,start_t )
1050
+ # l.log('rels _line output start',start_t)
1044
1051
first = False
1045
1052
1046
1053
if a .is_in ('done_ids' ,row_dict ['id' ]) :
0 commit comments